Whamcloud - gitweb
- land b_hd_ver_recov
authortappro <tappro>
Wed, 18 Mar 2009 09:11:49 +0000 (09:11 +0000)
committertappro <tappro>
Wed, 18 Mar 2009 09:11:49 +0000 (09:11 +0000)
56 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_object.c
lustre/include/Makefile.am
lustre/include/dt_object.h
lustre/include/linux/lustre_fsfilt.h
lustre/include/lu_target.h [new file with mode: 0644]
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/include/lustre_export.h
lustre/include/lustre_import.h
lustre/include/lustre_lib.h
lustre/include/lustre_log.h
lustre/include/lustre_net.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/liblustre/llite_lib.c
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/lvfs/fsfilt_ext3.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_capa.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/obdclass/dt_object.c
lustre/obdclass/genops.c
lustre/obdclass/llog_obd.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io_26.c
lustre/osd/osd_handler.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/service.c
lustre/ptlrpc/target.c [new file with mode: 0644]
lustre/ptlrpc/wiretest.c
lustre/tests/Makefile.am
lustre/tests/replay-single.sh
lustre/tests/replay-vbr.sh [new file with mode: 0644]
lustre/tests/sanity.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 8245d2f..82c89dc 100644 (file)
@@ -57,7 +57,7 @@
 # include <lustre_quota.h>
 #endif
 
-static struct obd_ops cmm_obd_device_ops = {
+struct obd_ops cmm_obd_device_ops = {
         .o_owner           = THIS_MODULE
 };
 
index fa52599..281dc0c 100644 (file)
@@ -362,6 +362,18 @@ static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
         RETURN(rc);
 }
 
+static dt_obj_version_t cml_version_get(const struct lu_env *env,
+                                        struct md_object *mo)
+{
+        return mo_version_get(env, md_object_next(mo));
+}
+
+static void cml_version_set(const struct lu_env *env, struct md_object *mo,
+                            dt_obj_version_t version)
+{
+        return mo_version_set(env, md_object_next(mo), version);
+}
+
 static const struct md_object_operations cml_mo_ops = {
         .moo_permission    = cml_permission,
         .moo_attr_get      = cml_attr_get,
@@ -379,6 +391,8 @@ static const struct md_object_operations cml_mo_ops = {
         .moo_readlink      = cml_readlink,
         .moo_capa_get      = cml_capa_get,
         .moo_object_sync   = cml_object_sync,
+        .moo_version_get   = cml_version_get,
+        .moo_version_set   = cml_version_set,
         .moo_path          = cml_path,
 };
 
@@ -953,6 +967,18 @@ static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
         return -EFAULT;
 }
 
+static dt_obj_version_t cmr_version_get(const struct lu_env *env,
+                                        struct md_object *mo)
+{
+        LBUG();
+}
+
+static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
+                            dt_obj_version_t version)
+{
+        LBUG();
+}
+
 static const struct md_object_operations cmr_mo_ops = {
         .moo_permission    = cmr_permission,
         .moo_attr_get      = cmr_attr_get,
@@ -970,6 +996,8 @@ static const struct md_object_operations cmr_mo_ops = {
         .moo_readlink      = cmr_readlink,
         .moo_capa_get      = cmr_capa_get,
         .moo_object_sync   = cmr_object_sync,
+        .moo_version_get   = cmr_version_get,
+        .moo_version_set   = cmr_version_set,
         .moo_path          = cmr_path,
 };
 
index d18e1a9..7496e1d 100644 (file)
@@ -46,4 +46,4 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
              lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
-            lu_ref.h cl_object.h lustre_acl.h lclient.h
+            lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h
index 4d33e83..160f47a 100644 (file)
@@ -242,6 +242,9 @@ struct dt_object_format {
 
 enum dt_format_type dt_mode_to_dft(__u32 mode);
 
+/** Version type. May differ in DMU and ldiskfs */
+typedef __u64 dt_obj_version_t;
+
 /**
  * Per-dt-object operations.
  */
@@ -371,6 +374,10 @@ struct dt_object_operations {
                                         struct lustre_capa *old,
                                         __u64 opc);
         int (*do_object_sync)(const struct lu_env *, struct dt_object *);
+        dt_obj_version_t (*do_version_get)(const struct lu_env *env,
+                                           struct dt_object *dt);
+        void (*do_version_set)(const struct lu_env *env, struct dt_object *dt,
+                               dt_obj_version_t new_version);
         /**
          * Get object info of next level. Currently, only get inode from osd.
          * This is only used by quota b=16542
@@ -572,6 +579,7 @@ struct dt_txn_callback {
         int (*dtc_txn_commit)(const struct lu_env *env,
                               struct thandle *txn, void *cookie);
         void            *dtc_cookie;
+        __u32            dtc_tag;
         struct list_head dtc_linkage;
 };
 
@@ -609,5 +617,40 @@ struct dt_object *dt_locate(const struct lu_env *env,
                             struct dt_device *dev,
                             const struct lu_fid *fid);
 
+static inline dt_obj_version_t do_version_get(const struct lu_env *env,
+                                              struct dt_object *o)
+{
+        LASSERT(o->do_ops->do_version_get);
+        return o->do_ops->do_version_get(env, o);
+}
+
+static inline void do_version_set(const struct lu_env *env,
+                                  struct dt_object *o, dt_obj_version_t v)
+{
+        LASSERT(o->do_ops->do_version_set);
+        return o->do_ops->do_version_set(env, o, v);
+}
+
+int dt_record_read(const struct lu_env *env, struct dt_object *dt,
+                   struct lu_buf *buf, loff_t *pos);
+int dt_record_write(const struct lu_env *env, struct dt_object *dt,
+                    const struct lu_buf *buf, loff_t *pos, struct thandle *th);
+
+
+static inline struct thandle *dt_trans_start(const struct lu_env *env,
+                                             struct dt_device *d,
+                                             struct txn_param *p)
+{
+        LASSERT(d->dd_ops->dt_trans_start);
+        return d->dd_ops->dt_trans_start(env, d, p);
+}
+
+static inline void dt_trans_stop(const struct lu_env *env,
+                                 struct dt_device *d,
+                                 struct thandle *th)
+{
+        LASSERT(d->dd_ops->dt_trans_stop);
+        return d->dd_ops->dt_trans_stop(env, th);
+}
 /** @} dt */
 #endif /* __LUSTRE_DT_OBJECT_H */
index 484d267..b9e01e6 100644 (file)
@@ -495,7 +495,7 @@ static inline __u64 fsfilt_set_version(struct obd_device *obd,
 static inline __u64 fsfilt_get_version(struct obd_device *obd,
                                        struct inode *inode)
 {
-        if (obd->obd_fsops->fs_set_version)
+        if (obd->obd_fsops->fs_get_version)
                 return obd->obd_fsops->fs_get_version(inode);
         return -EOPNOTSUPP;
 }
diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h
new file mode 100644 (file)
index 0000000..aae74e9
--- /dev/null
@@ -0,0 +1,80 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ */
+
+#ifndef _LUSTRE_LU_TARGET_H
+#define _LUSTRE_LU_TARGET_H
+
+#include <dt_object.h>
+#include <lustre_disk.h>
+
+struct lu_target {
+        struct obd_device       *lut_obd;
+        struct dt_device        *lut_bottom;
+        /** last_rcvd file */
+        struct dt_object        *lut_last_rcvd;
+        /* transaction callbacks */
+        struct dt_txn_callback   lut_txn_cb;
+        /** server data in last_rcvd file */
+        struct lr_server_data    lut_lsd;
+        /** Server last transaction number */
+        __u64                    lut_last_transno;
+        /** Lock protecting last transaction number */
+        spinlock_t               lut_translock;
+        /** Lock protecting client bitmap */
+        spinlock_t               lut_client_bitmap_lock;
+        /** Bitmap of known clients */
+        unsigned long            lut_client_bitmap[LR_CLIENT_BITMAP_SIZE];
+        /** Number of mounts */
+        __u64                    lut_mount_count;
+        __u32                    lut_stale_export_age;
+        spinlock_t               lut_trans_table_lock;
+};
+
+typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno,
+                         void *data, int err);
+struct lut_commit_cb {
+        lut_cb_t  lut_cb_func;
+        void     *lut_cb_data;
+};
+
+void lut_boot_epoch_update(struct lu_target *);
+void lut_cb_last_committed(struct lu_target *, __u64, void *, int);
+void lut_cb_client(struct lu_target *, __u64, void *, int);
+int lut_init(const struct lu_env *, struct lu_target *,
+             struct obd_device *, struct dt_device *);
+void lut_fini(const struct lu_env *, struct lu_target *);
+
+#endif /* __LUSTRE_LU_TARGET_H */
index 74623ae..176e21a 100644 (file)
@@ -785,7 +785,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_RMT_CLIENT_FORCE | \
                                 OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
                                 OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | \
-                                LRU_RESIZE_CONNECT_FLAG | \
+                                LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR | \
                                 OBD_CONNECT_LOV_V3)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
@@ -795,9 +795,9 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \
                                 OBD_CONNECT_CHANGE_QS | \
                                 OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_RMT_CLIENT | \
-                                OBD_CONNECT_RMT_CLIENT_FORCE | \
+                                OBD_CONNECT_RMT_CLIENT_FORCE | OBD_CONNECT_VBR | \
                                 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
-                               OBD_CONNECT_GRANT_SHRINK)
+                                OBD_CONNECT_GRANT_SHRINK)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
 
index df9fddd..9a81b46 100644 (file)
@@ -192,6 +192,11 @@ struct lustre_mount_data {
 
 /****************** last_rcvd file *********************/
 
+/** version recovery epoch */
+#define LR_EPOCH_BITS   32
+#define lr_epoch(a) ((a) >> LR_EPOCH_BITS)
+#define LR_EXPIRE_INTERVALS 16 /**< number of intervals to track transno */
+
 #define LR_SERVER_SIZE   512
 #define LR_CLIENT_START 8192
 #define LR_CLIENT_SIZE   128
@@ -211,6 +216,8 @@ struct lustre_mount_data {
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
+#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long))
+
 /** COMPAT_146: this is an OST (temporary) */
 #define OBD_COMPAT_OST          0x00000002
 /** COMPAT_146: this is an MDT (temporary) */
@@ -254,7 +261,13 @@ struct lr_server_data {
         __u8  lsd_peeruuid[40];    /* UUID of MDS associated with this OST */
         __u32 lsd_ost_index;       /* index number of OST in LOV */
         __u32 lsd_mdt_index;       /* index number of MDT in LMV */
-        __u8  lsd_padding[LR_SERVER_SIZE - 148];
+        __u32 lsd_start_epoch;     /* VBR: start epoch from last boot */
+        /** transaction values since lsd_trans_table_time */
+        __u64 lsd_trans_table[LR_EXPIRE_INTERVALS];
+        /** start point of transno table below */
+        __u32 lsd_trans_table_time; /* time of first slot in table above */
+        __u32 lsd_expire_intervals; /* LR_EXPIRE_INTERVALS */
+        __u8  lsd_padding[LR_SERVER_SIZE - 288];
 };
 
 /* Data stored per client in the last_rcvd file.  In le32 order. */
@@ -269,9 +282,120 @@ struct lsd_client_data {
         __u64 lcd_last_close_xid;     /* xid for the last transaction */
         __u32 lcd_last_close_result;  /* result from last RPC */
         __u32 lcd_last_close_data;    /* per-op data */
-        __u8  lcd_padding[LR_CLIENT_SIZE - 88];
+        /* VBR: last versions */
+        __u64 lcd_pre_versions[4];
+        __u32 lcd_last_epoch;
+        /** orphans handling for delayed export rely on that */
+        __u32 lcd_first_epoch;
+        __u8  lcd_padding[LR_CLIENT_SIZE - 128];
 };
 
+/* last_rcvd handling */
+static inline void lsd_le_to_cpu(struct lr_server_data *buf,
+                                 struct lr_server_data *lsd)
+{
+        int i;
+        memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid));
+        lsd->lsd_last_transno     = le64_to_cpu(buf->lsd_last_transno);
+        lsd->lsd_compat14         = le64_to_cpu(buf->lsd_compat14);
+        lsd->lsd_mount_count      = le64_to_cpu(buf->lsd_mount_count);
+        lsd->lsd_feature_compat   = le32_to_cpu(buf->lsd_feature_compat);
+        lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat);
+        lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat);
+        lsd->lsd_server_size      = le32_to_cpu(buf->lsd_server_size);
+        lsd->lsd_client_start     = le32_to_cpu(buf->lsd_client_start);
+        lsd->lsd_client_size      = le16_to_cpu(buf->lsd_client_size);
+        lsd->lsd_subdir_count     = le16_to_cpu(buf->lsd_subdir_count);
+        lsd->lsd_catalog_oid      = le64_to_cpu(buf->lsd_catalog_oid);
+        lsd->lsd_catalog_ogen     = le32_to_cpu(buf->lsd_catalog_ogen);
+        memcpy(lsd->lsd_peeruuid, buf->lsd_peeruuid, sizeof(lsd->lsd_peeruuid));
+        lsd->lsd_ost_index        = le32_to_cpu(buf->lsd_ost_index);
+        lsd->lsd_mdt_index        = le32_to_cpu(buf->lsd_mdt_index);
+        lsd->lsd_start_epoch      = le32_to_cpu(buf->lsd_start_epoch);
+        for (i = 0; i < LR_EXPIRE_INTERVALS; i++)
+                lsd->lsd_trans_table[i] = le64_to_cpu(buf->lsd_trans_table[i]);
+        lsd->lsd_trans_table_time = le32_to_cpu(buf->lsd_trans_table_time);
+        lsd->lsd_expire_intervals = le32_to_cpu(buf->lsd_expire_intervals);
+}
+
+static inline void lsd_cpu_to_le(struct lr_server_data *lsd,
+                                 struct lr_server_data *buf)
+{
+        int i;
+        memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (buf->lsd_uuid));
+        buf->lsd_last_transno     = cpu_to_le64(lsd->lsd_last_transno);
+        buf->lsd_compat14         = cpu_to_le64(lsd->lsd_compat14);
+        buf->lsd_mount_count      = cpu_to_le64(lsd->lsd_mount_count);
+        buf->lsd_feature_compat   = cpu_to_le32(lsd->lsd_feature_compat);
+        buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat);
+        buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat);
+        buf->lsd_server_size      = cpu_to_le32(lsd->lsd_server_size);
+        buf->lsd_client_start     = cpu_to_le32(lsd->lsd_client_start);
+        buf->lsd_client_size      = cpu_to_le16(lsd->lsd_client_size);
+        buf->lsd_subdir_count     = cpu_to_le16(lsd->lsd_subdir_count);
+        buf->lsd_catalog_oid      = cpu_to_le64(lsd->lsd_catalog_oid);
+        buf->lsd_catalog_ogen     = cpu_to_le32(lsd->lsd_catalog_ogen);
+        memcpy(buf->lsd_peeruuid, lsd->lsd_peeruuid, sizeof(buf->lsd_peeruuid));
+        buf->lsd_ost_index        = cpu_to_le32(lsd->lsd_ost_index);
+        buf->lsd_mdt_index        = cpu_to_le32(lsd->lsd_mdt_index);
+        buf->lsd_start_epoch      = cpu_to_le32(lsd->lsd_start_epoch);
+        for (i = 0; i < LR_EXPIRE_INTERVALS; i++)
+                buf->lsd_trans_table[i] = cpu_to_le64(lsd->lsd_trans_table[i]);
+        buf->lsd_trans_table_time = cpu_to_le32(lsd->lsd_trans_table_time);
+        buf->lsd_expire_intervals = cpu_to_le32(lsd->lsd_expire_intervals);
+}
+
+static inline void lcd_le_to_cpu(struct lsd_client_data *buf,
+                                 struct lsd_client_data *lcd)
+{
+        memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid));
+        lcd->lcd_last_transno       = le64_to_cpu(buf->lcd_last_transno);
+        lcd->lcd_last_xid           = le64_to_cpu(buf->lcd_last_xid);
+        lcd->lcd_last_result        = le32_to_cpu(buf->lcd_last_result);
+        lcd->lcd_last_data          = le32_to_cpu(buf->lcd_last_data);
+        lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno);
+        lcd->lcd_last_close_xid     = le64_to_cpu(buf->lcd_last_close_xid);
+        lcd->lcd_last_close_result  = le32_to_cpu(buf->lcd_last_close_result);
+        lcd->lcd_last_close_data    = le32_to_cpu(buf->lcd_last_close_data);
+        lcd->lcd_pre_versions[0]    = le64_to_cpu(buf->lcd_pre_versions[0]);
+        lcd->lcd_pre_versions[1]    = le64_to_cpu(buf->lcd_pre_versions[1]);
+        lcd->lcd_pre_versions[2]    = le64_to_cpu(buf->lcd_pre_versions[2]);
+        lcd->lcd_pre_versions[3]    = le64_to_cpu(buf->lcd_pre_versions[3]);
+        lcd->lcd_last_epoch         = le32_to_cpu(buf->lcd_last_epoch);
+        lcd->lcd_first_epoch        = le32_to_cpu(buf->lcd_first_epoch);
+}
+
+static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
+                                 struct lsd_client_data *buf)
+{
+        memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid));
+        buf->lcd_last_transno       = cpu_to_le64(lcd->lcd_last_transno);
+        buf->lcd_last_xid           = cpu_to_le64(lcd->lcd_last_xid);
+        buf->lcd_last_result        = cpu_to_le32(lcd->lcd_last_result);
+        buf->lcd_last_data          = cpu_to_le32(lcd->lcd_last_data);
+        buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno);
+        buf->lcd_last_close_xid     = cpu_to_le64(lcd->lcd_last_close_xid);
+        buf->lcd_last_close_result  = cpu_to_le32(lcd->lcd_last_close_result);
+        buf->lcd_last_close_data    = cpu_to_le32(lcd->lcd_last_close_data);
+        buf->lcd_pre_versions[0]    = cpu_to_le64(lcd->lcd_pre_versions[0]);
+        buf->lcd_pre_versions[1]    = cpu_to_le64(lcd->lcd_pre_versions[1]);
+        buf->lcd_pre_versions[2]    = cpu_to_le64(lcd->lcd_pre_versions[2]);
+        buf->lcd_pre_versions[3]    = cpu_to_le64(lcd->lcd_pre_versions[3]);
+        buf->lcd_last_epoch         = cpu_to_le32(lcd->lcd_last_epoch);
+        buf->lcd_first_epoch        = cpu_to_le32(lcd->lcd_first_epoch);
+}
+
+static inline __u64 lcd_last_transno(struct lsd_client_data *lcd)
+{
+        return (lcd->lcd_last_transno > lcd->lcd_last_close_transno ?
+                lcd->lcd_last_transno : lcd->lcd_last_close_transno);
+}
+
+static inline __u64 lcd_last_xid(struct lsd_client_data *lcd)
+{
+        return (lcd->lcd_last_xid > lcd->lcd_last_close_xid ?
+                lcd->lcd_last_xid : lcd->lcd_last_close_xid);
+}
 
 /****************** superblock additional info *********************/
 #ifdef __KERNEL__
@@ -311,7 +435,6 @@ struct lustre_mount_info {
 /****************** prototypes *********************/
 
 #ifdef __KERNEL__
-#include <obd_class.h>
 
 /* obd_mount.c */
 void lustre_register_client_fill_super(int (*cfs)(struct super_block *sb));
@@ -319,10 +442,6 @@ void lustre_register_kill_super_cb(void (*cfs)(struct super_block *sb));
 
 
 int lustre_common_put_super(struct super_block *sb);
-int lustre_process_log(struct super_block *sb, char *logname,
-                     struct config_llog_instance *cfg);
-int lustre_end_log(struct super_block *sb, char *logname,
-                       struct config_llog_instance *cfg);
 struct lustre_mount_info *server_get_mount(const char *name);
 struct lustre_mount_info *server_get_mount_2(const char *name);
 int server_put_mount(const char *name, struct vfsmount *mnt);
index 6ee97d6..33b5248 100644 (file)
 #include <lprocfs_status.h>
 #include <class_hash.h>
 
-/* Data stored per client in the last_rcvd file.  In le32 order. */
 struct mds_client_data;
 struct mdt_client_data;
 struct mds_idmap_table;
 struct mdt_idmap_table;
 
+struct lu_export_data {
+        /** Protects led_lcd below */
+        struct semaphore        led_lcd_lock;
+        /** Per-client data for each export */
+        struct lsd_client_data *led_lcd;
+        /** Offset of record in last_rcvd file */
+        loff_t                  led_lr_off;
+        /** Client index in last_rcvd file */
+        int                     led_lr_idx;
+};
+
 struct mdt_export_data {
+        struct lu_export_data   med_led;
         struct list_head        med_open_head;
         spinlock_t              med_open_lock; /* lock med_open_head, mfd_list*/
-        struct semaphore        med_lcd_lock;
-        struct lsd_client_data *med_lcd;
         __u64                   med_ibits_known;
-        loff_t                  med_lr_off;
-        int                     med_lr_idx;
         struct semaphore           med_idmap_sem;
         struct lustre_idmap_table *med_idmap;
 };
 
+#define med_lcd_lock    med_led.led_lcd_lock
+#define med_lcd         med_led.led_lcd
+#define med_lr_off      med_led.led_lr_off
+#define med_lr_idx      med_led.led_lr_idx
+
 struct osc_creator {
         spinlock_t              oscc_lock;
         struct list_head        oscc_list;
@@ -82,10 +94,8 @@ struct ec_export_data { /* echo client */
 
 /* In-memory access to client data from OST struct */
 struct filter_export_data {
-        spinlock_t                 fed_lock;      /* protects fed_open_head */
-        struct lsd_client_data    *fed_lcd;
-        loff_t                     fed_lr_off;
-        int                        fed_lr_idx;
+        struct lu_export_data      fed_led;
+        spinlock_t                 fed_lock;     /**< protects fed_mod_list */
         long                       fed_dirty;    /* in bytes */
         long                       fed_grant;    /* in bytes */
         struct list_head           fed_mod_list; /* files being modified */
@@ -94,6 +104,11 @@ struct filter_export_data {
         __u32                      fed_group;
 };
 
+#define fed_lcd_lock    fed_led.led_lcd_lock
+#define fed_lcd         fed_led.led_lcd
+#define fed_lr_off      fed_led.led_lr_off
+#define fed_lr_idx      fed_led.led_lr_idx
+
 typedef struct nid_stat_uuid {
         struct list_head ns_uuid_list;
         struct obd_uuid  ns_uuid;
@@ -137,7 +152,10 @@ struct obd_export {
         lustre_hash_t            *exp_lock_hash; /* existing lock hash */
         spinlock_t                exp_lock_hash_lock;
         struct list_head          exp_outstanding_replies;
-        time_t                    exp_last_request_time;
+        struct list_head          exp_uncommitted_replies;
+        spinlock_t                exp_uncommitted_replies_lock;
+        __u64                     exp_last_committed;
+        cfs_time_t                exp_last_request_time;
         struct list_head          exp_req_replay_queue;
         spinlock_t                exp_lock; /* protects flags int below */
         /* ^ protects exp_outstanding_replies too */
@@ -147,6 +165,10 @@ struct obd_export {
                                   exp_in_recovery:1,
                                   exp_disconnected:1,
                                   exp_connecting:1,
+                                  /** VBR: export missed recovery */
+                                  exp_delayed:1,
+                                  /** VBR: failed version checking */
+                                  exp_vbr_failed:1,
                                   exp_req_replay_needed:1,
                                   exp_lock_replay_needed:1,
                                   exp_need_sync:1,
@@ -161,16 +183,25 @@ struct obd_export {
         cfs_time_t                exp_flvr_expire[2];   /* seconds */
 
         union {
+                struct lu_export_data     eu_target_data;
                 struct mdt_export_data    eu_mdt_data;
                 struct filter_export_data eu_filter_data;
                 struct ec_export_data     eu_ec_data;
         } u;
 };
 
+#define exp_target_data u.eu_target_data
 #define exp_mdt_data    u.eu_mdt_data
 #define exp_filter_data u.eu_filter_data
 #define exp_ec_data     u.eu_ec_data
 
+static inline int exp_expired(struct obd_export *exp, cfs_duration_t age)
+{
+        LASSERT(exp->exp_delayed);
+        return cfs_time_before(cfs_time_add(exp->exp_last_request_time, age),
+                               cfs_time_current_sec());
+}
+
 static inline int exp_connect_cancelset(struct obd_export *exp)
 {
         LASSERT(exp != NULL);
@@ -197,6 +228,13 @@ static inline int client_is_remote(struct obd_export *exp)
                   OBD_CONNECT_RMT_CLIENT);
 }
 
+static inline int exp_connect_vbr(struct obd_export *exp)
+{
+        LASSERT(exp != NULL);
+        LASSERT(exp->exp_connection);
+        return !!(exp->exp_connect_flags & OBD_CONNECT_VBR);
+}
+
 static inline int imp_connect_lru_resize(struct obd_import *imp)
 {
         struct obd_connect_data *ocd;
index f3fd466..5f82e8f 100644 (file)
@@ -164,6 +164,9 @@ struct obd_import {
                                   imp_server_timeout:1,   /* use 1/2 timeout on MDS' OSCs */
                                   imp_initial_recov:1,    /* retry the initial connection */  
                                   imp_initial_recov_bk:1, /* turn off init_recov after trying all failover nids */
+                                  imp_delayed_recovery:1, /* VBR: imp in delayed recovery */
+                                  imp_no_lock_replay:1,   /* VBR: if gap was found then no lock replays */
+                                  imp_vbr_failed:1,       /* recovery by versions was failed */
                                   imp_force_verify:1,     /* force an immidiate ping */
                                   imp_pingable:1,         /* pingable */
                                   imp_resend_replay:1,    /* resend for replay */
index c99450e..bbcdd91 100644 (file)
@@ -62,9 +62,8 @@ void ll_get_random_bytes(void *buf, int size);
 
 /* target.c */
 struct ptlrpc_request;
-struct recovd_data;
-struct recovd_obd;
 struct obd_export;
+struct lu_target;
 #include <lustre_ha.h>
 #include <lustre_net.h>
 #include <lvfs.h>
@@ -89,8 +88,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req);
 #define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
 
 void target_cancel_recovery_timer(struct obd_device *obd);
-int target_start_recovery_thread(struct obd_device *obd,
-                                 svc_handler_t handler);
 void target_stop_recovery_thread(struct obd_device *obd);
 void target_cleanup_recovery(struct obd_device *obd);
 int target_queue_recovery_request(struct ptlrpc_request *req,
index eddaeef..ad6e6ca 100644 (file)
@@ -61,7 +61,7 @@
 #error Unsupported operating system.
 #endif
 
-#include <obd.h>
+#include <obd_class.h>
 #include <obd_ost.h>
 #include <lustre/lustre_idl.h>
 
@@ -294,7 +294,7 @@ struct llog_commit_master {
         /**
          * The refcount for lcm
          */
-         atomic_t                 lcm_refcount;
+         atomic_t                  lcm_refcount;
         /**
          * Thread control structure. Used for control commit thread.
          */
@@ -313,7 +313,7 @@ struct llog_commit_master {
         char                       lcm_name[LCM_NAME_SIZE];
 };
 
-static inline struct llog_commit_master 
+static inline struct llog_commit_master
 *lcm_get(struct llog_commit_master *lcm)
 {
         LASSERT(atomic_read(&lcm->lcm_refcount) > 0);
@@ -321,13 +321,13 @@ static inline struct llog_commit_master
         return lcm;
 }
 
-static inline void 
+static inline void
 lcm_put(struct llog_commit_master *lcm)
 {
         if (!atomic_dec_and_test(&lcm->lcm_refcount)) {
                 return ;
         }
-        OBD_FREE_PTR(lcm);     
+        OBD_FREE_PTR(lcm);
 }
 
 struct llog_canceld_ctxt {
@@ -675,4 +675,9 @@ static inline int llog_connect(struct llog_ctxt *ctxt,
         RETURN(rc);
 }
 
+int lustre_process_log(struct super_block *sb, char *logname,
+                       struct config_llog_instance *cfg);
+int lustre_end_log(struct super_block *sb, char *logname,
+                   struct config_llog_instance *cfg);
+
 #endif
index 70c3150..30bf3d8 100644 (file)
@@ -1004,7 +1004,7 @@ struct ptlrpc_service_conf {
 /* ptlrpc/service.c */
 void ptlrpc_save_lock (struct ptlrpc_request *req,
                        struct lustre_handle *lock, int mode, int no_ack);
-void ptlrpc_commit_replies (struct obd_device *obd);
+void ptlrpc_commit_replies(struct obd_export *exp);
 void ptlrpc_dispatch_difficult_reply (struct ptlrpc_reply_state *rs);
 void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
@@ -1105,6 +1105,7 @@ void lustre_msg_add_version(struct lustre_msg *msg, int version);
 __u32 lustre_msg_get_opc(struct lustre_msg *msg);
 __u64 lustre_msg_get_last_xid(struct lustre_msg *msg);
 __u64 lustre_msg_get_last_committed(struct lustre_msg *msg);
+__u64 *lustre_msg_get_versions(struct lustre_msg *msg);
 __u64 lustre_msg_get_transno(struct lustre_msg *msg);
 __u64 lustre_msg_get_slv(struct lustre_msg *msg);
 __u32 lustre_msg_get_limit(struct lustre_msg *msg);
@@ -1123,6 +1124,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
 void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
 void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid);
 void lustre_msg_set_last_committed(struct lustre_msg *msg,__u64 last_committed);
+void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions);
 void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno);
 void lustre_msg_set_status(struct lustre_msg *msg, __u32 status);
 void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
@@ -1271,6 +1273,7 @@ int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
                            int priority);
 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
+void client_destroy_import(struct obd_import *imp);
 
 /* ptlrpc/pinger.c */
 enum timeout_event {
index 8832552..0a16b78 100644 (file)
@@ -56,7 +56,7 @@
 /*
  * super-class definitions.
  */
-#include <lu_object.h>
+#include <dt_object.h>
 #include <lvfs.h>
 
 struct md_device;
@@ -246,7 +246,10 @@ struct md_object_operations {
                             struct lustre_capa *, int renewal);
 
         int (*moo_object_sync)(const struct lu_env *, struct md_object *);
-
+        dt_obj_version_t (*moo_version_get)(const struct lu_env *,
+                                            struct md_object *);
+        void (*moo_version_set)(const struct lu_env *, struct md_object *,
+                                dt_obj_version_t);
         int (*moo_path)(const struct lu_env *env, struct md_object *obj,
                         char *path, int pathlen, __u64 *recno, int *linkno);
 };
@@ -704,6 +707,20 @@ static inline int mo_object_sync(const struct lu_env *env, struct md_object *m)
         return m->mo_ops->moo_object_sync(env, m);
 }
 
+static inline dt_obj_version_t mo_version_get(const struct lu_env *env,
+                                              struct md_object *m)
+{
+        LASSERT(m->mo_ops->moo_version_get);
+        return m->mo_ops->moo_version_get(env, m);
+}
+
+static inline void mo_version_set(const struct lu_env *env,
+                                  struct md_object *m, dt_obj_version_t ver)
+{
+        LASSERT(m->mo_ops->moo_version_set);
+        return m->mo_ops->moo_version_set(env, m, ver);
+}
+
 static inline int mdo_lookup(const struct lu_env *env,
                              struct md_object *p,
                              const struct lu_name *lname,
index dc7745e..a894998 100644 (file)
@@ -60,7 +60,7 @@
 #define IOC_MDC_MAX_NR       50
 
 #include <lustre/lustre_idl.h>
-#include <lu_object.h>
+#include <lu_target.h>
 #include <lu_ref.h>
 #include <lustre_lib.h>
 #include <lustre_export.h>
@@ -247,6 +247,20 @@ struct ost_server_data;
 /* hold common fields for "target" device */
 struct obd_device_target {
         struct super_block       *obt_sb;
+        /** last_rcvd file */
+        struct file              *obt_rcvd_filp;
+        /** server data in last_rcvd file */
+        struct lr_server_data    *obt_lsd;
+        /** Lock protecting client bitmap */
+        spinlock_t                obt_client_bitmap_lock;
+        /** Bitmap of known clients */
+        unsigned long            *obt_client_bitmap;
+        /** Server last transaction number */
+        __u64                     obt_last_transno;
+        /** Lock protecting last transaction number */
+        spinlock_t                obt_translock;
+        /** Number of mounts */
+        __u64                     obt_mount_count;
         atomic_t                  obt_quotachecking;
         struct lustre_quota_ctxt  obt_qctxt;
         lustre_quota_version_t    obt_qfmt;
@@ -288,6 +302,7 @@ struct filter_ext {
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
+        struct lu_target     fo_lut;
         const char          *fo_fstype;
         struct vfsmount     *fo_vfsmnt;
 
@@ -300,12 +315,7 @@ struct filter_obd {
 
 
         spinlock_t           fo_objidlock;      /* protect fo_lastobjid */
-        spinlock_t           fo_translock;      /* protect fsd_last_transno */
-        struct file         *fo_rcvd_filp;
         struct file         *fo_health_check_filp;
-        struct lr_server_data *fo_fsd;
-        unsigned long       *fo_last_rcvd_slots;
-        __u64                fo_mount_count;
 
         unsigned long        fo_destroys_in_progress;
         struct semaphore     fo_create_locks[FILTER_SUBDIR_COUNT];
@@ -372,6 +382,12 @@ struct filter_obd {
         int                      fo_sec_level;
 };
 
+#define fo_translock            fo_obt.obt_translock
+#define fo_rcvd_filp            fo_obt.obt_rcvd_filp
+#define fo_fsd                  fo_obt.obt_lsd
+#define fo_last_rcvd_slots      fo_obt.obt_client_bitmap
+#define fo_mount_count          fo_obt.obt_mount_count
+
 struct timeout_item {
         enum timeout_event ti_event;
         cfs_time_t         ti_timeout;
@@ -380,6 +396,7 @@ struct timeout_item {
         struct list_head   ti_obd_list;
         struct list_head   ti_chain;
 };
+
 #define OSC_MAX_RIF_DEFAULT       8
 #define OSC_MAX_RIF_MAX         256
 #define OSC_MAX_DIRTY_DEFAULT  (OSC_MAX_RIF_DEFAULT * 4)
@@ -516,15 +533,10 @@ struct mds_obd {
         cfs_dentry_t                    *mds_fid_de;
         int                              mds_max_mdsize;
         int                              mds_max_cookiesize;
-        struct file                     *mds_rcvd_filp;
-        spinlock_t                       mds_transno_lock;
-        __u64                            mds_last_transno;
-        __u64                            mds_mount_count;
         __u64                            mds_io_epoch;
         unsigned long                    mds_atime_diff;
         struct semaphore                 mds_epoch_sem;
         struct ll_fid                    mds_rootfid;
-        struct lr_server_data           *mds_server_data;
         cfs_dentry_t                    *mds_pending_dir;
         cfs_dentry_t                    *mds_logs_dir;
         cfs_dentry_t                    *mds_objects_dir;
@@ -548,8 +560,6 @@ struct mds_obd {
         __u32                            mds_lov_objid_lastidx;
 
         struct file                     *mds_health_check_filp;
-        unsigned long                   *mds_client_bitmap;
-//        struct upcall_cache             *mds_group_hash;
 
         struct lustre_quota_info         mds_quota_info;
         struct semaphore                 mds_qonoff_sem;
@@ -570,6 +580,13 @@ struct mds_obd {
         struct rw_semaphore              mds_notify_lock;
 };
 
+#define mds_transno_lock         mds_obt.obt_translock
+#define mds_rcvd_filp            mds_obt.obt_rcvd_filp
+#define mds_server_data          mds_obt.obt_lsd
+#define mds_client_bitmap        mds_obt.obt_client_bitmap
+#define mds_mount_count          mds_obt.obt_mount_count
+#define mds_last_transno         mds_obt.obt_last_transno
+
 /* lov objid */
 extern __u32 mds_max_ost_index;
 
@@ -829,6 +846,8 @@ struct obd_trans_info {
         /* initial thread handling transaction */
         struct ptlrpc_thread *   oti_thread;
         __u32                    oti_conn_cnt;
+        /** VBR: versions */
+        __u64                    oti_pre_version;
 
         struct obd_uuid         *oti_ost_uuid;
 };
@@ -844,7 +863,15 @@ static inline void oti_init(struct obd_trans_info *oti,
                 return;
 
         oti->oti_xid = req->rq_xid;
+        /** VBR: take versions from request */
+        if (req->rq_reqmsg != NULL &&
+            lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
+                oti->oti_pre_version = pre_version ? pre_version[0] : 0;
+                oti->oti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        }
 
+        /** called from mds_create_objects */
         if (req->rq_repmsg != NULL)
                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
         oti->oti_thread = req->rq_svc_thread;
@@ -990,7 +1017,8 @@ struct obd_device {
         unsigned long obd_attached:1,      /* finished attach */
                       obd_set_up:1,        /* finished setup */
                       obd_recovering:1,    /* there are recoverable clients */
-                      obd_abort_recovery:1,/* somebody ioctl'ed us to abort */
+                      obd_abort_recovery:1,/* recovery expired */
+                      obd_version_recov:1, /* obd uses version checking */
                       obd_replayable:1,    /* recovery is enabled; inform clients */
                       obd_no_transno:1,    /* no committed-transno notification */
                       obd_no_recov:1,      /* fail instead of retry messages */
@@ -1013,6 +1041,7 @@ struct obd_device {
         atomic_t                obd_refcount;
         cfs_waitq_t             obd_refcount_waitq;
         struct list_head        obd_exports;
+        struct list_head        obd_delayed_exports;
         int                     obd_num_exports;
         spinlock_t              obd_nid_lock;
         struct ldlm_namespace  *obd_namespace;
@@ -1041,13 +1070,12 @@ struct obd_device {
         int                              obd_max_recoverable_clients;
         int                              obd_connected_clients;
         int                              obd_recoverable_clients;
+        int                              obd_delayed_clients;
         spinlock_t                       obd_processing_task_lock; /* BH lock (timer) */
         __u64                            obd_next_recovery_transno;
         int                              obd_replayed_requests;
         int                              obd_requests_queued_for_recovery;
         cfs_waitq_t                      obd_next_transno_waitq;
-        struct list_head                 obd_uncommitted_replies;
-        spinlock_t                       obd_uncommitted_replies_lock;
         cfs_timer_t                      obd_recovery_timer;
         time_t                           obd_recovery_start; /* seconds */
         time_t                           obd_recovery_end; /* seconds, for lprocfs_status */
@@ -1571,22 +1599,24 @@ int lvfs_check_io_health(struct obd_device *obd, struct file *file);
 #define OBD_CALC_STRIPE_END     2
 
 static inline void obd_transno_commit_cb(struct obd_device *obd, __u64 transno,
-                                         int error)
+                                         struct obd_export *exp, int error)
 {
         if (error) {
                 CERROR("%s: transno "LPU64" commit error: %d\n",
                        obd->obd_name, transno, error);
                 return;
         }
-        if (transno > obd->obd_last_committed) {
-                CDEBUG(D_INFO, "%s: transno "LPD64" committed\n",
+        if (exp && transno > exp->exp_last_committed) {
+                CDEBUG(D_HA, "%s: transno "LPU64" committed\n",
                        obd->obd_name, transno);
-                obd->obd_last_committed = transno;
-                ptlrpc_commit_replies (obd);
+                exp->exp_last_committed = transno;
+                ptlrpc_commit_replies(exp);
         } else {
-                CDEBUG(D_INFO, "%s: transno "LPD64" committed\n",
+                CDEBUG(D_INFO, "%s: transno "LPU64" committed\n",
                        obd->obd_name, transno);
         }
+        if (transno > obd->obd_last_committed)
+                obd->obd_last_committed = transno;
 }
 
 static inline void init_obd_quota_ops(quota_interface_t *interface,
index 7096eaa..75bc6d2 100644 (file)
@@ -327,6 +327,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_TGT_REPLAY_DROP         0x707
 #define OBD_FAIL_TGT_FAKE_EXP            0x708
 #define OBD_FAIL_TGT_REPLAY_DELAY        0x709
+#define OBD_FAIL_TGT_LAST_REPLAY         0x710
 
 #define OBD_FAIL_MDC_REVALIDATE_PAUSE    0x800
 #define OBD_FAIL_MDC_ENQUEUE_PAUSE       0x801
index 2c498dc..ac048ee 100644 (file)
@@ -188,7 +188,7 @@ out:
         RETURN(rc);
 }
 
-static void destroy_import(struct obd_import *imp)
+void client_destroy_import(struct obd_import *imp)
 {
         /* drop security policy instance after all rpc finished/aborted
          * to let all busy contexts be released. */
@@ -539,7 +539,7 @@ int client_disconnect_export(struct obd_export *exp)
                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
                 imp->imp_rq_pool = NULL;
         }
-        destroy_import(imp);
+        client_destroy_import(imp);
         cli->cl_import = NULL;
 
         EXIT;
@@ -962,7 +962,7 @@ dont_check_exports:
                 atomic_inc(&target->obd_lock_replay_clients);
                 if (target->obd_connected_clients ==
                     target->obd_max_recoverable_clients)
-                        wake_up(&target->obd_next_transno_waitq);
+                        cfs_waitq_signal(&target->obd_next_transno_waitq);
         }
         spin_unlock_bh(&target->obd_processing_task_lock);
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
@@ -986,7 +986,7 @@ dont_check_exports:
                  */
                 sptlrpc_import_inval_all_ctx(export->exp_imp_reverse);
 
-                destroy_import(export->exp_imp_reverse);
+                client_destroy_import(export->exp_imp_reverse);
         }
 
         /* for the rest part, we return -ENOTCONN in case of errors
@@ -1055,7 +1055,7 @@ void target_destroy_export(struct obd_export *exp)
         /* exports created from last_rcvd data, and "fake"
            exports created by lctl don't have an import */
         if (exp->exp_imp_reverse != NULL)
-                destroy_import(exp->exp_imp_reverse);
+                client_destroy_import(exp->exp_imp_reverse);
 
         /* We cancel locks at disconnect time, but this will catch any locks
          * granted in a race with recovery-induced disconnect. */
@@ -1189,7 +1189,7 @@ static void target_finish_recovery(struct obd_device *obd)
             list_empty(&obd->obd_final_req_queue)) {
                 obd->obd_processing_task = 0;
         } else {
-                CERROR("%s: Recovery queues ( %s%s%s) are empty\n",
+                CERROR("%s: Recovery queues ( %s%s%s) are not empty\n",
                        obd->obd_name,
                        list_empty(&obd->obd_req_replay_queue) ? "" : "req ",
                        list_empty(&obd->obd_lock_replay_queue) ? "" : "lock ",
@@ -1465,7 +1465,8 @@ static int check_for_next_transno(struct obd_device *obd)
                  * to replay requests that demand on already committed ones
                  * also, we can replay first non-committed transation */
                 LASSERT(req_transno != 0);
-                if (req_transno == obd->obd_last_committed + 1) {
+                if (obd->obd_version_recov ||
+                    req_transno == obd->obd_last_committed + 1) {
                         obd->obd_next_recovery_transno = req_transno;
                 } else if (req_transno > obd->obd_last_committed) {
                         /* can't continue recovery: have no needed transno */
@@ -1564,6 +1565,11 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
                 req = list_entry(obd->obd_final_req_queue.next,
                                  struct ptlrpc_request, rq_list);
                 list_del_init(&req->rq_list);
+                if (req->rq_export->exp_in_recovery) {
+                        spin_lock(&req->rq_export->exp_lock);
+                        req->rq_export->exp_in_recovery = 0;
+                        spin_unlock(&req->rq_export->exp_lock);
+                }
         } else {
                 req = NULL;
         }
@@ -1571,6 +1577,11 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
         return req;
 }
 
+static inline int req_vbr_done(struct obd_export *exp)
+{
+        return (exp->exp_vbr_failed == 0);
+}
+
 static inline int req_replay_done(struct obd_export *exp)
 {
         return (exp->exp_req_replay_needed == 0);
@@ -1588,7 +1599,7 @@ static inline int connect_done(struct obd_export *exp)
 
 static int check_for_clients(struct obd_device *obd)
 {
-        if (obd->obd_abort_recovery)
+        if (obd->obd_abort_recovery || obd->obd_version_recov)
                 return 1;
         LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
         if (obd->obd_no_conn == 0 &&
@@ -1639,7 +1650,8 @@ static void resume_recovery_timer(struct obd_device *obd)
 
 static int target_recovery_thread(void *arg)
 {
-        struct obd_device *obd = arg;
+        struct lu_target *lut = arg;
+        struct obd_device *obd = lut->lut_obd;
         struct ptlrpc_request *req;
         struct target_recovery_data *trd = &obd->obd_recovery_data;
         struct l_wait_info lwi = { 0 };
@@ -1665,8 +1677,8 @@ static int target_recovery_thread(void *arg)
         env.le_ctx.lc_thread = thread;
 
         CERROR("%s: started recovery thread pid %d\n", obd->obd_name,
-               current->pid);
-        trd->trd_processing_task = current->pid;
+               cfs_curproc_pid());
+        trd->trd_processing_task = cfs_curproc_pid();
 
         obd->obd_recovering = 1;
         complete(&trd->trd_starting);
@@ -1681,24 +1693,24 @@ static int target_recovery_thread(void *arg)
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
         /* If some clients haven't connected in time, evict them */
-        if (obd->obd_abort_recovery) {
+        if (obd->obd_connected_clients < obd->obd_max_recoverable_clients) {
                 CWARN("Some clients haven't connect in time (%d/%d),"
                        "evict them\n", obd->obd_connected_clients,
                        obd->obd_max_recoverable_clients);
-                obd->obd_abort_recovery = obd->obd_stopping;
-                class_disconnect_stale_exports(obd, connect_done, 
-                                               exp_flags_from_obd(obd) | 
+                class_disconnect_stale_exports(obd, connect_done,
+                                               exp_flags_from_obd(obd) |
                                                OBD_OPT_ABORT_RECOV);
         }
+
         /* next stage: replay requests */
         delta = jiffies;
         obd->obd_req_replaying = 1;
         CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n",
-              atomic_read(&obd->obd_req_replay_clients),
-              obd->obd_next_recovery_transno);
+               atomic_read(&obd->obd_req_replay_clients),
+               obd->obd_next_recovery_transno);
         resume_recovery_timer(obd);
         while ((req = target_next_replay_req(obd))) {
-                LASSERT(trd->trd_processing_task == current->pid);
+                LASSERT(trd->trd_processing_task == cfs_curproc_pid());
                 DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
                           lustre_msg_get_transno(req->rq_reqmsg),
                           libcfs_nid2str(req->rq_peer.nid));
@@ -1716,21 +1728,21 @@ static int target_recovery_thread(void *arg)
 
         /* If some clients haven't replayed requests in time, evict them */
         if (obd->obd_abort_recovery) {
-                CDEBUG(D_ERROR, "req replay timed out, aborting ...\n");
-                obd->obd_abort_recovery = obd->obd_stopping;
-                class_disconnect_stale_exports(obd, req_replay_done, 
-                                               exp_flags_from_obd(obd) | 
+                CDEBUG(D_WARNING, "req replay is aborted\n");
+                class_disconnect_stale_exports(obd, req_replay_done,
+                                               exp_flags_from_obd(obd) |
                                                OBD_OPT_ABORT_RECOV);
                 abort_req_replay_queue(obd);
         }
+        LASSERT(list_empty(&obd->obd_req_replay_queue));
 
         /* The second stage: replay locks */
         CDEBUG(D_INFO, "2: lock replay stage - %d clients\n",
                atomic_read(&obd->obd_lock_replay_clients));
         resume_recovery_timer(obd);
         while ((req = target_next_replay_lock(obd))) {
-                LASSERT(trd->trd_processing_task == current->pid);
-                DEBUG_REQ(D_HA|D_WARNING, req, "processing lock from %s: ",
+                LASSERT(trd->trd_processing_task == cfs_curproc_pid());
+                DEBUG_REQ(D_HA, req, "processing lock from %s: ",
                           libcfs_nid2str(req->rq_peer.nid));
                 handle_recovery_req(thread, req,
                                     trd->trd_recovery_handler);
@@ -1743,28 +1755,34 @@ static int target_recovery_thread(void *arg)
         /* If some clients haven't replayed requests in time, evict them */
         if (obd->obd_abort_recovery) {
                 int stale;
-                CERROR("lock replay timed out, aborting ...\n");
-                obd->obd_abort_recovery = obd->obd_stopping;
-                stale = class_disconnect_stale_exports(obd, lock_replay_done, 
-                                                       exp_flags_from_obd(obd) | 
+                CERROR("lock replay is aborted\n");
+                stale = class_disconnect_stale_exports(obd, lock_replay_done,
+                                                       exp_flags_from_obd(obd) |
                                                        OBD_OPT_ABORT_RECOV);
                 abort_lock_replay_queue(obd);
         }
+        LASSERT(list_empty(&obd->obd_lock_replay_queue));
 
+        /* The third stage: reply on final pings */
+        CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
+        /** Update server last boot epoch */
+        lut_boot_epoch_update(lut);
         /* We drop recoverying flag to forward all new requests
          * to regular mds_handle() since now */
         spin_lock_bh(&obd->obd_processing_task_lock);
         obd->obd_recovering = obd->obd_abort_recovery = 0;
         spin_unlock_bh(&obd->obd_processing_task_lock);
-        /* The third stage: reply on final pings */
-        CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
         while ((req = target_next_final_ping(obd))) {
-                LASSERT(trd->trd_processing_task == current->pid);
+                LASSERT(trd->trd_processing_task == cfs_curproc_pid());
                 DEBUG_REQ(D_HA, req, "processing final ping from %s: ",
                           libcfs_nid2str(req->rq_peer.nid));
                 handle_recovery_req(thread, req,
                                     trd->trd_recovery_handler);
         }
+        /* evict exports failed VBR */
+        class_disconnect_stale_exports(obd, req_vbr_done,
+                                       exp_flags_from_obd(obd) |
+                                       OBD_OPT_ABORT_RECOV);
 
         delta = (jiffies - delta) / HZ;
         CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
@@ -1784,8 +1802,10 @@ static int target_recovery_thread(void *arg)
         RETURN(rc);
 }
 
-int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+static int target_start_recovery_thread(struct lu_target *lut,
+                                        svc_handler_t handler)
 {
+        struct obd_device *obd = lut->lut_obd;
         int rc = 0;
         struct target_recovery_data *trd = &obd->obd_recovery_data;
 
@@ -1794,7 +1814,7 @@ int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
         init_completion(&trd->trd_finishing);
         trd->trd_recovery_handler = handler;
 
-        if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
+        if (kernel_thread(target_recovery_thread, lut, 0) > 0) {
                 wait_for_completion(&trd->trd_starting);
                 LASSERT(obd->obd_recovering != 0);
         } else
@@ -1810,7 +1830,7 @@ void target_stop_recovery_thread(struct obd_device *obd)
                 struct target_recovery_data *trd = &obd->obd_recovery_data;
                 CERROR("%s: Aborting recovery\n", obd->obd_name);
                 obd->obd_abort_recovery = 1;
-                wake_up(&obd->obd_next_transno_waitq);
+                cfs_waitq_signal(&obd->obd_next_transno_waitq);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 wait_for_completion(&trd->trd_finishing);
         } else {
@@ -1834,21 +1854,26 @@ static void target_recovery_expired(unsigned long castmeharder)
                       obd->obd_name, obd->obd_recoverable_clients,
                       cfs_time_current_sec()- obd->obd_recovery_start,
                       obd->obd_connected_clients);
+
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_recovering)
-                obd->obd_abort_recovery = 1;
+        obd->obd_version_recov = 1;
+        CDEBUG(D_INFO, "VBR is used for %d clients from t"LPU64"\n",
+               atomic_read(&obd->obd_req_replay_clients),
+               obd->obd_next_recovery_transno);
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
-void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
+void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
 {
+        struct obd_device *obd = lut->lut_obd;
         if (obd->obd_max_recoverable_clients == 0)
                 return;
 
         CWARN("RECOVERY: service %s, %d recoverable clients, "
               "last_transno "LPU64"\n", obd->obd_name,
               obd->obd_max_recoverable_clients, obd->obd_last_committed);
+        LASSERT(obd->obd_stopping == 0);
         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
         obd->obd_recovery_start = 0;
         obd->obd_recovery_end = 0;
@@ -1856,13 +1881,14 @@ void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
         /* bz13079: this should be set to desired value for ost but not for mds */
         obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
         cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
-        target_start_recovery_thread(obd, handler);
+        target_start_recovery_thread(lut, handler);
 }
 EXPORT_SYMBOL(target_recovery_init);
 
 #endif
 
-int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+static int target_process_req_flags(struct obd_device *obd,
+                                    struct ptlrpc_request *req)
 {
         struct obd_export *exp = req->rq_export;
         LASSERT(exp != NULL);
@@ -1879,7 +1905,6 @@ int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
                         obd->obd_recoverable_clients--;
                         if (atomic_read(&obd->obd_req_replay_clients) == 0)
                                 CDEBUG(D_HA, "all clients have replayed reqs\n");
-                        wake_up(&obd->obd_next_transno_waitq);
                 }
                 spin_unlock_bh(&obd->obd_processing_task_lock);
         }
@@ -1895,7 +1920,6 @@ int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
                         atomic_dec(&obd->obd_lock_replay_clients);
                         if (atomic_read(&obd->obd_lock_replay_clients) == 0)
                                 CDEBUG(D_HA, "all clients have replayed locks\n");
-                        wake_up(&obd->obd_next_transno_waitq);
                 }
                 spin_unlock_bh(&obd->obd_processing_task_lock);
         }
@@ -1909,7 +1933,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         struct list_head *tmp;
         int inserted = 0;
         __u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
-
         ENTRY;
 
         if (obd->obd_recovery_data.trd_processing_task == cfs_curproc_pid()) {
@@ -1927,6 +1950,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                         RETURN(-ENOMEM);
                 DEBUG_REQ(D_HA, req, "queue final req");
                 spin_lock_bh(&obd->obd_processing_task_lock);
+                cfs_waitq_signal(&obd->obd_next_transno_waitq);
                 if (obd->obd_recovering)
                         list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
                 else {
@@ -1948,6 +1972,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                         RETURN(-ENOMEM);
                 DEBUG_REQ(D_HA, req, "queue lock replay req");
                 spin_lock_bh(&obd->obd_processing_task_lock);
+                cfs_waitq_signal(&obd->obd_next_transno_waitq);
                 LASSERT(obd->obd_recovering);
                 /* usually due to recovery abort */
                 if (!req->rq_export->exp_in_recovery) {
@@ -1958,7 +1983,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 LASSERT(req->rq_export->exp_lock_replay_needed);
                 list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                wake_up(&obd->obd_next_transno_waitq);
                 RETURN(0);
         }
 
@@ -2043,7 +2067,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
 
         obd->obd_requests_queued_for_recovery++;
-        wake_up(&obd->obd_next_transno_waitq);
+        cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
         RETURN(0);
 }
@@ -2156,21 +2180,18 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         rs->rs_transno   = req->rq_transno;
         rs->rs_export    = exp;
 
-        spin_lock(&obd->obd_uncommitted_replies_lock);
-
+        spin_lock(&exp->exp_uncommitted_replies_lock);
         CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
-               rs->rs_transno, obd->obd_last_committed);
-        if (rs->rs_transno > obd->obd_last_committed) {
+               rs->rs_transno, exp->exp_last_committed);
+        if (rs->rs_transno > exp->exp_last_committed) {
                 /* not committed already */
-                list_add_tail (&rs->rs_obd_list,
-                               &obd->obd_uncommitted_replies);
+                list_add_tail(&rs->rs_obd_list,
+                              &exp->exp_uncommitted_replies);
         }
+        spin_unlock (&exp->exp_uncommitted_replies_lock);
 
-        spin_unlock (&obd->obd_uncommitted_replies_lock);
-        spin_lock (&exp->exp_lock);
-
-        list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
-
+        spin_lock(&exp->exp_lock);
+        list_add_tail(&rs->rs_exp_list, &exp->exp_outstanding_replies);
         spin_unlock(&exp->exp_lock);
 
         netrc = target_send_reply_msg (req, rc, fail_id);
@@ -2191,7 +2212,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         }
 
         spin_lock(&rs->rs_lock);
-        if (rs->rs_transno <= obd->obd_last_committed ||
+        if (rs->rs_transno <= exp->exp_last_committed ||
             (!rs->rs_on_net && !rs->rs_no_ack) ||
              list_empty(&rs->rs_exp_list) ||     /* completed already */
              list_empty(&rs->rs_obd_list)) {
@@ -2214,26 +2235,19 @@ int target_handle_ping(struct ptlrpc_request *req)
 
 void target_committed_to_req(struct ptlrpc_request *req)
 {
-        struct obd_device *obd;
-
-        if (req == NULL || req->rq_export == NULL)
-                return;
-
-        obd = req->rq_export->exp_obd;
-        if (obd == NULL)
-                return;
+        struct obd_export *exp = req->rq_export;
 
-        if (!obd->obd_no_transno && req->rq_repmsg != NULL)
+        if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL)
                 lustre_msg_set_last_committed(req->rq_repmsg,
-                                              obd->obd_last_committed);
+                                              exp->exp_last_committed);
         else
                 DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
-                          "%d)", obd->obd_no_transno, req->rq_repmsg == NULL);
+                          "%d)", exp->exp_obd->obd_no_transno,
+                          req->rq_repmsg == NULL);
 
         CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
-               obd->obd_last_committed, req->rq_transno, req->rq_xid);
+               exp->exp_last_committed, req->rq_transno, req->rq_xid);
 }
-
 EXPORT_SYMBOL(target_committed_to_req);
 
 int target_handle_qc_callback(struct ptlrpc_request *req)
index ae31ec6..3aabe5e 100644 (file)
@@ -2557,7 +2557,6 @@ EXPORT_SYMBOL(client_obd_setup);
 EXPORT_SYMBOL(client_obd_cleanup);
 EXPORT_SYMBOL(client_connect_import);
 EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_start_recovery_thread);
 EXPORT_SYMBOL(target_stop_recovery_thread);
 EXPORT_SYMBOL(target_handle_connect);
 EXPORT_SYMBOL(target_cleanup_recovery);
index 5a34a82..7baec8c 100644 (file)
@@ -177,7 +177,7 @@ int liblustre_process_log(struct config_llog_instance *cfg,
                 GOTO(out_cleanup, rc = -ENOMEM);
 
         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID |
-                                 OBD_CONNECT_AT;
+                                 OBD_CONNECT_AT | OBD_CONNECT_VBR;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd->ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
index 5e8acf3..0234909 100644 (file)
@@ -2017,7 +2017,8 @@ llu_fsswop_mount(const char *source,
                            sizeof(async), &async, NULL);
 
         ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION |
-                                OBD_CONNECT_FID | OBD_CONNECT_AT;
+                                OBD_CONNECT_FID | OBD_CONNECT_AT |
+                                OBD_CONNECT_VBR;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd.ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
index b89b895..e780458 100644 (file)
@@ -196,7 +196,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT_VERSION  | OBD_CONNECT_MDS_CAPA |
                                   OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
                                   OBD_CONNECT_FID      | OBD_CONNECT_AT |
-                                  OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT;
+                                  OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT |
+                                  OBD_CONNECT_VBR;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
@@ -339,7 +340,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT_CANCELSET | OBD_CONNECT_FID      |
                                   OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK|
                                   OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT |
-                                  OBD_CONNECT_OSS_CAPA | OBD_CONNECT_GRANT_SHRINK;
+                                  OBD_CONNECT_OSS_CAPA | OBD_CONNECT_VBR|
+                                  OBD_CONNECT_GRANT_SHRINK;
 
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
index d69e284..f24135d 100644 (file)
@@ -146,6 +146,8 @@ static char *fsfilt_ext3_uuid(struct super_block *sb)
  */
 static __u64 fsfilt_ext3_get_version(struct inode *inode)
 {
+        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+               EXT3_I(inode)->i_fs_version, inode->i_ino);
         return EXT3_I(inode)->i_fs_version;
 }
 
@@ -156,7 +158,12 @@ static __u64 fsfilt_ext3_set_version(struct inode *inode, __u64 new_version)
 {
         __u64 old_version = EXT3_I(inode)->i_fs_version;
 
+        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               new_version, old_version, inode->i_ino);
         (EXT3_I(inode))->i_fs_version = new_version;
+        /* version is set after all inode operations are finished, so we should
+         * mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
         return old_version;
 }
 
index 8586475..b815d12 100644 (file)
@@ -84,6 +84,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
         mdd->mdd_txn_cb.dtc_cookie = mdd;
+        mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD;
         CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage);
         mdd->mdd_atime_diff = MAX_ATIME_DIFF;
 
index 0a86597..970ef8b 100644 (file)
@@ -2231,6 +2231,24 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
         return next->do_ops->do_object_sync(env, next);
 }
 
+static dt_obj_version_t mdd_version_get(const struct lu_env *env,
+                                        struct md_object *obj)
+{
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+        LASSERT(mdd_object_exists(mdd_obj));
+        return do_version_get(env, mdd_object_child(mdd_obj));
+}
+
+static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
+                            dt_obj_version_t version)
+{
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+        LASSERT(mdd_object_exists(mdd_obj));
+        return do_version_set(env, mdd_object_child(mdd_obj), version);
+}
+
 const struct md_object_operations mdd_obj_ops = {
         .moo_permission    = mdd_permission,
         .moo_attr_get      = mdd_attr_get,
@@ -2248,5 +2266,7 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_readlink      = mdd_readlink,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
+        .moo_version_get   = mdd_version_get,
+        .moo_version_set   = mdd_version_set,
         .moo_path          = mdd_path,
 };
index 1f03d81..3592985 100644 (file)
@@ -99,9 +99,9 @@ static int write_capa_keys(const struct lu_env *env,
         for (i = 0; i < 2; i++) {
                 lck_cpu_to_le(tmp, &keys[i]);
 
-                rc = mdt_record_write(env, mdt->mdt_ck_obj,
-                                      mdt_buf_const(env, tmp, sizeof(*tmp)),
-                                      &off, th);
+                rc = dt_record_write(env, mdt->mdt_ck_obj,
+                                     mdt_buf_const(env, tmp, sizeof(*tmp)),
+                                     &off, th);
                 if (rc)
                         break;
         }
@@ -125,8 +125,8 @@ static int read_capa_keys(const struct lu_env *env,
         tmp = &mti->mti_capa_key;
 
         for (i = 0; i < 2; i++) {
-                rc = mdt_record_read(env, mdt->mdt_ck_obj,
-                                     mdt_buf(env, tmp, sizeof(*tmp)), &off);
+                rc = dt_record_read(env, mdt->mdt_ck_obj,
+                                    mdt_buf(env, tmp, sizeof(*tmp)), &off);
                 if (rc)
                         return rc;
 
index d84f3f6..0099f50 100644 (file)
@@ -2652,6 +2652,10 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        info->mti_mos[0] = NULL;
+        info->mti_mos[1] = NULL;
+        info->mti_mos[2] = NULL;
+        info->mti_mos[3] = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
         info->mti_body = NULL;
@@ -3355,7 +3359,8 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         rep->lock_policy_res2 = clear_serious(rc);
 
         lhc->mlh_reg_lh.cookie = 0ull;
-        if (rc == -ENOTCONN || rc == -ENODEV) {
+        if (rc == -ENOTCONN || rc == -ENODEV ||
+            rc == -EOVERFLOW) { /**< if VBR failure then return error */
                 /*
                  * If it is the disconnect error (ENODEV & ENOCONN), the error
                  * will be returned by rq_status, and client at ptlrpc layer
@@ -4298,6 +4303,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         int                waited = 0;
         ENTRY;
 
+        target_recovery_fini(obd);
         /* At this point, obd exports might still be on the "obd_zombie_exports"
          * list, and obd_zombie_impexp_thread() is trying to destroy them.
          * We wait a little bit until all exports (except the self-export)
@@ -4321,7 +4327,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         ping_evictor_stop();
 
-        target_recovery_fini(obd);
         mdt_stop_ptlrpc_service(m);
         mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         mdt_obd_llog_cleanup(obd);
@@ -4329,6 +4334,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
 #endif
+        lut_fini(env, &m->mdt_lut);
         mdt_fs_cleanup(env, m);
         upcall_cache_cleanup(m->mdt_identity_cache);
         m->mdt_identity_cache = NULL;
@@ -4597,10 +4603,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_fini_proc, rc);
         }
 
-        rc = mdt_fld_init(env, obd->obd_name, m);
+        rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
         if (rc)
                 GOTO(err_fini_stack, rc);
 
+        rc = mdt_fld_init(env, obd->obd_name, m);
+        if (rc)
+                GOTO(err_lut, rc);
+
         rc = mdt_seq_init(env, obd->obd_name, m);
         if (rc)
                 GOTO(err_fini_fld, rc);
@@ -4660,7 +4670,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         server_put_mount_2(dev, lmi->lmi_mnt);
         lmi = NULL;
 
-        target_recovery_init(obd, mdt_recovery_handle);
+        target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
 
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
@@ -4710,6 +4720,8 @@ err_fini_seq:
         mdt_seq_fini(env, m);
 err_fini_fld:
         mdt_fld_fini(env, m);
+err_lut:
+        lut_fini(env, &m->mdt_lut);
 err_fini_stack:
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_fini_proc:
index c8f215c..953ede9 100644 (file)
  * struct lustre_handle
  */
 #include <lustre/lustre_idl.h>
+#include <lustre_disk.h>
+#include <lu_target.h>
 #include <md_object.h>
-#include <dt_object.h>
 #include <lustre_fid.h>
 #include <lustre_fld.h>
 #include <lustre_req_layout.h>
-/* LR_CLIENT_SIZE, etc. */
-#include <lustre_disk.h>
 #include <lustre_sec.h>
 #include <lvfs.h>
 #include <lustre_idmap.h>
 #include <lustre_eacl.h>
 #include <lustre_fsfilt.h>
 
-static inline __u64 lcd_last_transno(struct lsd_client_data *lcd)
-{
-        return max(lcd->lcd_last_transno, lcd->lcd_last_close_transno);
-}
-
-static inline __u64 lcd_last_xid(struct lsd_client_data *lcd)
-{
-        return max(lcd->lcd_last_xid, lcd->lcd_last_close_xid);
-}
-
 /* check if request's xid is equal to last one or not*/
 static inline int req_xid_is_last(struct ptlrpc_request *req)
 {
@@ -120,6 +109,8 @@ struct mdt_device {
         /* underlying device */
         struct md_device          *mdt_child;
         struct dt_device          *mdt_bottom;
+        /** target device */
+        struct lu_target           mdt_lut;
         /*
          * Options bit-fields.
          */
@@ -138,25 +129,13 @@ struct mdt_device {
         spinlock_t                 mdt_ioepoch_lock;
         __u64                      mdt_ioepoch;
 
-        /* Transaction related stuff here */
-        spinlock_t                 mdt_transno_lock;
-        __u64                      mdt_last_transno;
-
         /* transaction callbacks */
         struct dt_txn_callback     mdt_txn_cb;
-        /* last_rcvd file */
-        struct dt_object          *mdt_last_rcvd;
 
         /* these values should be updated from lov if necessary.
          * or should be placed somewhere else. */
         int                        mdt_max_mdsize;
         int                        mdt_max_cookiesize;
-        __u64                      mdt_mount_count;
-
-        /* last_rcvd data */
-        struct lr_server_data      mdt_lsd;
-        spinlock_t                 mdt_client_bitmap_lock;
-        unsigned long              mdt_client_bitmap[(LR_MAX_CLIENTS >> 3) / sizeof(long)];
 
         struct upcall_cache        *mdt_identity_cache;
 
@@ -188,6 +167,14 @@ struct mdt_device {
         int                        mdt_sec_level;
 };
 
+#define mdt_transno_lock        mdt_lut.lut_translock
+#define mdt_last_transno        mdt_lut.lut_last_transno
+#define mdt_last_rcvd           mdt_lut.lut_last_rcvd
+#define mdt_mount_count         mdt_lut.lut_mount_count
+#define mdt_lsd                 mdt_lut.lut_lsd
+#define mdt_client_bitmap_lock  mdt_lut.lut_client_bitmap_lock
+#define mdt_client_bitmap       mdt_lut.lut_client_bitmap
+
 #define MDT_SERVICE_WATCHDOG_FACTOR     (2000)
 #define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
 #define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
@@ -332,6 +319,9 @@ struct mdt_thread_info {
          */
         struct mdt_reint_record    mti_rr;
 
+        /** md objects included in operation */
+        struct mdt_object         *mti_mos[PTLRPC_NUM_VERSIONS];
+
         /*
          * Operation specification (currently create and lookup)
          */
@@ -382,6 +372,11 @@ struct mdt_thread_info {
         struct md_attr             mti_tmp_attr;
 };
 
+#define mti_parent      mti_mos[0]
+#define mti_child       mti_mos[1]
+#define mti_parent1     mti_mos[2]
+#define mti_child1      mti_mos[3]
+
 typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno,
                          void *data, int err);
 struct mdt_commit_cb {
@@ -394,7 +389,6 @@ enum mdt_txn_op {
         MDT_TXN_LAST_RCVD_WRITE_OP,
 };
 
-
 /*
  * Info allocated per-transaction.
  */
@@ -402,13 +396,13 @@ enum mdt_txn_op {
 struct mdt_txn_info {
         __u64                 txi_transno;
         unsigned int          txi_cb_count;
-        struct mdt_commit_cb  txi_cb[MDT_MAX_COMMIT_CB];
+        struct lut_commit_cb  txi_cb[MDT_MAX_COMMIT_CB];
 };
 
 extern struct lu_context_key mdt_txn_key;
 
 static inline void mdt_trans_add_cb(const struct thandle *th,
-                                    mdt_cb_t cb_func, void *cb_data)
+                                    lut_cb_t cb_func, void *cb_data)
 {
         struct mdt_txn_info *txi;
 
@@ -416,8 +410,8 @@ static inline void mdt_trans_add_cb(const struct thandle *th,
         LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb));
 
         /* add new callback */
-        txi->txi_cb[txi->txi_cb_count].mdt_cb_func = cb_func;
-        txi->txi_cb[txi->txi_cb_count].mdt_cb_data = cb_data;
+        txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func;
+        txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data;
         txi->txi_cb_count++;
 }
 
@@ -546,7 +540,7 @@ void mdt_reconstruct_generic(struct mdt_thread_info *mti,
                              struct mdt_lock_handle *lhc);
 
 extern void target_recovery_fini(struct obd_device *obd);
-extern void target_recovery_init(struct obd_device *obd,
+extern void target_recovery_init(struct lu_target *lut,
                                  svc_handler_t handler);
 int mdt_fs_setup(const struct lu_env *, struct mdt_device *,
                  struct obd_device *, struct lustre_sb_info *lsi);
@@ -618,6 +612,7 @@ int mdt_check_ucred(struct mdt_thread_info *);
 int mdt_init_ucred(struct mdt_thread_info *, struct mdt_body *);
 int mdt_init_ucred_reint(struct mdt_thread_info *);
 void mdt_exit_ucred(struct mdt_thread_info *);
+int mdt_version_get_check(struct mdt_thread_info *, int);
 
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
index 0f29b77..2e077ce 100644 (file)
@@ -1003,6 +1003,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
+                info->mti_mos[0] = parent;
+                info->mti_mos[1] = child;
+                result = mdt_version_get_check(info, 0);
+                if (result)
+                        GOTO(out_child, result);
+
                 /* Let lower layers know what is lock mode on directory. */
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
index c32ae5a..10794c3 100644 (file)
@@ -78,38 +78,6 @@ const struct lu_buf *mdt_buf_const(const struct lu_env *env,
         return buf;
 }
 
-int mdt_record_read(const struct lu_env *env,
-                    struct dt_object *dt, struct lu_buf *buf, loff_t *pos)
-{
-        int rc;
-
-        LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
-
-        rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
-
-        if (rc == buf->lb_len)
-                rc = 0;
-        else if (rc >= 0)
-                rc = -EFAULT;
-        return rc;
-}
-
-int mdt_record_write(const struct lu_env *env,
-                     struct dt_object *dt, const struct lu_buf *buf,
-                     loff_t *pos, struct thandle *th)
-{
-        int rc;
-
-        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
-        LASSERT(th != NULL);
-        rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
-        if (rc == buf->lb_len)
-                rc = 0;
-        else if (rc >= 0)
-                rc = -EFAULT;
-        return rc;
-}
-
 static inline int mdt_trans_credit_get(const struct lu_env *env,
                                        struct mdt_device *mdt,
                                        enum mdt_txn_op op)
@@ -166,61 +134,6 @@ void mdt_trans_stop(const struct lu_env *env,
         mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th);
 }
 
-/* last_rcvd handling */
-static inline void lsd_le_to_cpu(struct lr_server_data *buf,
-                                 struct lr_server_data *lsd)
-{
-        memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid));
-        lsd->lsd_last_transno     = le64_to_cpu(buf->lsd_last_transno);
-        lsd->lsd_mount_count      = le64_to_cpu(buf->lsd_mount_count);
-        lsd->lsd_feature_compat   = le32_to_cpu(buf->lsd_feature_compat);
-        lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat);
-        lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat);
-        lsd->lsd_server_size      = le32_to_cpu(buf->lsd_server_size);
-        lsd->lsd_client_start     = le32_to_cpu(buf->lsd_client_start);
-        lsd->lsd_client_size      = le16_to_cpu(buf->lsd_client_size);
-}
-
-static inline void lsd_cpu_to_le(struct lr_server_data *lsd,
-                                 struct lr_server_data *buf)
-{
-        memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (lsd->lsd_uuid));
-        buf->lsd_last_transno     = cpu_to_le64(lsd->lsd_last_transno);
-        buf->lsd_mount_count      = cpu_to_le64(lsd->lsd_mount_count);
-        buf->lsd_feature_compat   = cpu_to_le32(lsd->lsd_feature_compat);
-        buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat);
-        buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat);
-        buf->lsd_server_size      = cpu_to_le32(lsd->lsd_server_size);
-        buf->lsd_client_start     = cpu_to_le32(lsd->lsd_client_start);
-        buf->lsd_client_size      = cpu_to_le16(lsd->lsd_client_size);
-}
-
-static inline void lcd_le_to_cpu(struct lsd_client_data *buf,
-                                 struct lsd_client_data *lcd)
-{
-        memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid));
-        lcd->lcd_last_transno       = le64_to_cpu(buf->lcd_last_transno);
-        lcd->lcd_last_xid           = le64_to_cpu(buf->lcd_last_xid);
-        lcd->lcd_last_result        = le32_to_cpu(buf->lcd_last_result);
-        lcd->lcd_last_data          = le32_to_cpu(buf->lcd_last_data);
-        lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno);
-        lcd->lcd_last_close_xid     = le64_to_cpu(buf->lcd_last_close_xid);
-        lcd->lcd_last_close_result  = le32_to_cpu(buf->lcd_last_close_result);
-}
-
-static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
-                                 struct lsd_client_data *buf)
-{
-        memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid));
-        buf->lcd_last_transno       = cpu_to_le64(lcd->lcd_last_transno);
-        buf->lcd_last_xid           = cpu_to_le64(lcd->lcd_last_xid);
-        buf->lcd_last_result        = cpu_to_le32(lcd->lcd_last_result);
-        buf->lcd_last_data          = cpu_to_le32(lcd->lcd_last_data);
-        buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno);
-        buf->lcd_last_close_xid     = cpu_to_le64(lcd->lcd_last_close_xid);
-        buf->lcd_last_close_result  = cpu_to_le32(lcd->lcd_last_close_result);
-}
-
 static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
                                             struct mdt_device *mdt)
 {
@@ -230,9 +143,9 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
 
         mti->mti_off = 0;
-        rc = mdt_record_read(env, mdt->mdt_last_rcvd,
-                             mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)),
-                             &mti->mti_off);
+        rc = dt_record_read(env, mdt->mdt_last_rcvd,
+                            mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)),
+                            &mti->mti_off);
         if (rc == 0)
                 lsd_le_to_cpu(&mti->mti_lsd, &mdt->mdt_lsd);
 
@@ -244,13 +157,6 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
         return rc;
 }
 
-static void mdt_client_cb(const struct mdt_device *mdt, __u64 transno,
-                          void *data, int err)
-{
-        struct obd_device *obd = mdt2obd_dev(mdt);
-        target_client_add_cb(obd, transno, data, err);
-}
-
 static inline int mdt_last_rcvd_header_write(const struct lu_env *env,
                                              struct mdt_device *mdt,
                                              int need_sync)
@@ -276,12 +182,12 @@ static inline int mdt_last_rcvd_header_write(const struct lu_env *env,
         lsd_cpu_to_le(&mdt->mdt_lsd, &mti->mti_lsd);
 
         if (need_sync && mti->mti_exp)
-                mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp);
+                mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
 
-        rc = mdt_record_write(env, mdt->mdt_last_rcvd,
-                              mdt_buf_const(env, &mti->mti_lsd,
-                                            sizeof(mti->mti_lsd)),
-                              &mti->mti_off, th);
+        rc = dt_record_write(env, mdt->mdt_last_rcvd,
+                             mdt_buf_const(env, &mti->mti_lsd,
+                                           sizeof(mti->mti_lsd)),
+                             &mti->mti_off, th);
 
         mdt_trans_stop(env, mdt, th);
 
@@ -302,8 +208,8 @@ static int mdt_last_rcvd_read(const struct lu_env *env,
 
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         tmp = &mti->mti_lcd;
-        rc = mdt_record_read(env, mdt->mdt_last_rcvd,
-                             mdt_buf(env, tmp, sizeof(*tmp)), off);
+        rc = dt_record_read(env, mdt->mdt_last_rcvd,
+                            mdt_buf(env, tmp, sizeof(*tmp)), off);
         if (rc == 0)
                 lcd_le_to_cpu(tmp, lcd);
 
@@ -344,8 +250,8 @@ static int mdt_last_rcvd_write(const struct lu_env *env,
 
         lcd_cpu_to_le(lcd, tmp);
 
-        rc = mdt_record_write(env, mdt->mdt_last_rcvd,
-                              mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
+        rc = dt_record_write(env, mdt->mdt_last_rcvd,
+                             mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
 
         CDEBUG(D_INFO, "write lcd @%d rc = %d:\n"
                        "uuid = %s\n"
@@ -440,6 +346,8 @@ static int mdt_clients_data_init(const struct lu_env *env,
                         rc = mdt_client_add(env, mdt, cl_idx);
                         /* can't fail existing */
                         LASSERTF(rc == 0, "rc = %d\n", rc);
+                        /* VBR: set export last committed version */
+                        exp->exp_last_committed = last_transno;
                         lcd = NULL;
                         spin_lock(&exp->exp_lock);
                         exp->exp_connecting = 0;
@@ -578,7 +486,7 @@ static int mdt_server_data_init(const struct lu_env *env,
         lsd->lsd_mount_count = mdt->mdt_mount_count;
 
         /* save it, so mount count and last_transno is current */
-        rc = mdt_server_data_update(env, mdt, (mti->mti_exp && 
+        rc = mdt_server_data_update(env, mdt, (mti->mti_exp &&
                                                mti->mti_exp->exp_need_sync));
         if (rc)
                 GOTO(err_client, rc);
@@ -586,7 +494,7 @@ static int mdt_server_data_init(const struct lu_env *env,
         RETURN(0);
 
 err_client:
-        target_recovery_fini(obd);
+        class_disconnect_exports(obd);
 out:
         return rc;
 }
@@ -671,13 +579,13 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
-        /* 
+        /*
          * Until this operations will be committed the sync is needed
          * for this export. This should be done _after_ starting the
          * transaction so that many connecting clients will not bring
-         * server down with lots of sync writes. 
+         * server down with lots of sync writes.
          */
-        mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp);
+        mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
         spin_lock(&mti->mti_exp->exp_lock);
         mti->mti_exp->exp_need_sync = 1;
         spin_unlock(&mti->mti_exp->exp_lock);
@@ -813,11 +721,11 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
                         GOTO(free, rc = PTR_ERR(th));
 
                 if (need_sync) {
-                        /* 
+                        /*
                          * Until this operations will be committed the sync
-                         * is needed for this export. 
+                         * is needed for this export.
                          */
-                        mdt_trans_add_cb(th, mdt_client_cb, exp);
+                        mdt_trans_add_cb(th, lut_cb_client, exp);
                 }
 
                 mutex_down(&med->med_lcd_lock);
@@ -836,10 +744,10 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
         clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
         spin_unlock(&mdt->mdt_client_bitmap_lock);
 
-        /* 
+        /*
          * Make sure the server's last_transno is up to date. Do this
          * after the client is freed so we know all the client's
-         * transactions have been committed. 
+         * transactions have been committed.
          */
         mdt_server_data_update(env, mdt, need_sync);
 
@@ -863,7 +771,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
         loff_t off;
         int err;
         __s32 rc = th->th_result;
-        __u64 *transno_p;
 
         ENTRY;
         LASSERT(req);
@@ -882,14 +789,25 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
         }
 
         off = med->med_lr_off;
+        LASSERT(ergo(mti->mti_transno == 0, rc != 0));
         mutex_down(&med->med_lcd_lock);
         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
-                transno_p = &lcd->lcd_last_close_transno;
+                if (mti->mti_transno != 0)
+                        lcd->lcd_last_close_transno = mti->mti_transno;
                 lcd->lcd_last_close_xid = req->rq_xid;
                 lcd->lcd_last_close_result = rc;
         } else {
-                transno_p = &lcd->lcd_last_transno;
+                /* VBR: save versions in last_rcvd for reconstruct. */
+                __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                if (pre_versions) {
+                        lcd->lcd_pre_versions[0] = pre_versions[0];
+                        lcd->lcd_pre_versions[1] = pre_versions[1];
+                        lcd->lcd_pre_versions[2] = pre_versions[2];
+                        lcd->lcd_pre_versions[3] = pre_versions[3];
+                }
+                if (mti->mti_transno != 0)
+                        lcd->lcd_last_transno = mti->mti_transno;
                 lcd->lcd_last_xid = req->rq_xid;
                 lcd->lcd_last_result = rc;
                 /*XXX: save intent_disposition in mdt_thread_info?
@@ -898,20 +816,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                 lcd->lcd_last_data = mti->mti_opdata;
         }
 
-        /*
-         * When we store zero transno in lcd we can lost last transno value
-         * because lcd contains 0, but lsd is not yet written
-         * The server data should be updated also if the latest
-         * transno is rewritten by zero. See the bug 11125 for details.
-         */
-        if (mti->mti_transno == 0 &&
-            *transno_p == mdt->mdt_last_transno)
-                mdt_server_data_update(mti->mti_env, mdt, 
-                                      (mti->mti_exp && 
-                                       mti->mti_exp->exp_need_sync));
-
-        *transno_p = mti->mti_transno;
-
         if (off <= 0) {
                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
                 err = -EINVAL;
@@ -935,6 +839,17 @@ static int mdt_txn_start_cb(const struct lu_env *env,
         return 0;
 }
 
+/* Set new object versions */
+static void mdt_versions_set(struct mdt_thread_info *info)
+{
+        int i;
+        for (i = 0; i < PTLRPC_NUM_VERSIONS; i++)
+                if (info->mti_mos[i] != NULL)
+                        mo_version_set(info->mti_env,
+                                       mdt_object_child(info->mti_mos[i]),
+                                       info->mti_transno);
+}
+
 /* Update last_rcvd records with latests transaction data */
 static int mdt_txn_stop_cb(const struct lu_env *env,
                            struct thandle *txn, void *cookie)
@@ -969,7 +884,6 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
                 if (mti->mti_transno != 0) {
                         CERROR("Replay transno "LPU64" failed: rc %i\n",
                                mti->mti_transno, txn->th_result);
-                        mti->mti_transno = 0;
                 }
         } else if (mti->mti_transno == 0) {
                 mti->mti_transno = ++ mdt->mdt_last_transno;
@@ -978,10 +892,14 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
                 if (mti->mti_transno > mdt->mdt_last_transno)
                         mdt->mdt_last_transno = mti->mti_transno;
         }
-
+        spin_unlock(&mdt->mdt_transno_lock);
         /* sometimes the reply message has not been successfully packed */
         LASSERT(req != NULL && req->rq_repmsg != NULL);
 
+        /** VBR: set new versions */
+        if (txn->th_result == 0)
+                mdt_versions_set(mti);
+
         /* filling reply data */
         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
                mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
@@ -992,7 +910,10 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
                          lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd));
         /* save transno for the commit callback */
         txi->txi_transno = mti->mti_transno;
-        spin_unlock(&mdt->mdt_transno_lock);
+
+        /* add separate commit callback for transaction handling because we need
+         * export as parameter */
+        mdt_trans_add_cb(txn, lut_cb_last_committed, mti->mti_exp);
 
         return mdt_last_rcvd_update(mti, txn);
 }
@@ -1002,29 +923,15 @@ static int mdt_txn_commit_cb(const struct lu_env *env,
                              struct thandle *txn, void *cookie)
 {
         struct mdt_device *mdt = cookie;
-        struct obd_device *obd = mdt2obd_dev(mdt);
         struct mdt_txn_info *txi;
         int i;
 
         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
 
-        /* copy of obd_transno_commit_cb() but with locking */
-        spin_lock(&mdt->mdt_transno_lock);
-        if (txi->txi_transno > obd->obd_last_committed) {
-                obd->obd_last_committed = txi->txi_transno;
-                spin_unlock(&mdt->mdt_transno_lock);
-                ptlrpc_commit_replies(obd);
-        } else
-                spin_unlock(&mdt->mdt_transno_lock);
-
-        if (txi->txi_transno)
-                CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
-                       obd->obd_name, txi->txi_transno);
-
         /* iterate through all additional callbacks */
         for (i = 0; i < txi->txi_cb_count; i++) {
-                txi->txi_cb[i].mdt_cb_func(mdt, txi->txi_transno,
-                                           txi->txi_cb[i].mdt_cb_data, 0);
+                txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno,
+                                           txi->txi_cb[i].lut_cb_data, 0);
         }
         return 0;
 }
@@ -1046,21 +953,14 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
         mdt->mdt_txn_cb.dtc_cookie = mdt;
+        mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
         CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
 
         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
-        o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid);
-        if (!IS_ERR(o)) {
-                mdt->mdt_last_rcvd = o;
-                rc = mdt_server_data_init(env, mdt, lsi);
-                if (rc)
-                        GOTO(put_last_rcvd, rc);
-        } else {
-                rc = PTR_ERR(o);
-                CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+        rc = mdt_server_data_init(env, mdt, lsi);
+        if (rc)
                 RETURN(rc);
-        }
 
         o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
         if (!IS_ERR(o)) {
@@ -1071,16 +971,15 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         } else {
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
-                GOTO(put_last_rcvd, rc);
+                GOTO(disconnect_exports, rc);
         }
         RETURN(0);
 
 put_ck_object:
         lu_object_put(env, &o->do_lu);
         mdt->mdt_ck_obj = NULL;
-put_last_rcvd:
-        lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
-        mdt->mdt_last_rcvd = NULL;
+disconnect_exports:
+        class_disconnect_exports(obd);
         return rc;
 }
 
@@ -1090,9 +989,6 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
 
         /* Remove transaction callback */
         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
-        if (mdt->mdt_last_rcvd)
-                lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
-        mdt->mdt_last_rcvd = NULL;
         if (mdt->mdt_ck_obj)
                 lu_object_put(env, &mdt->mdt_ck_obj->do_lu);
         mdt->mdt_ck_obj = NULL;
@@ -1151,6 +1047,20 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
         spin_unlock(&exp->exp_lock);
 }
 
+/**
+ * VBR: restore versions
+ */
+void mdt_vbr_reconstruct(struct ptlrpc_request *req,
+                         struct lsd_client_data *lcd)
+{
+        __u64 pre_versions[4] = {0};
+        pre_versions[0] = lcd->lcd_pre_versions[0];
+        pre_versions[1] = lcd->lcd_pre_versions[1];
+        pre_versions[2] = lcd->lcd_pre_versions[2];
+        pre_versions[3] = lcd->lcd_pre_versions[3];
+        lustre_msg_set_versions(req->rq_repmsg, pre_versions);
+}
+
 void mdt_req_from_lcd(struct ptlrpc_request *req,
                       struct lsd_client_data *lcd)
 {
@@ -1161,14 +1071,18 @@ void mdt_req_from_lcd(struct ptlrpc_request *req,
             lustre_msg_get_opc(req->rq_repmsg) == MDS_DONE_WRITING) {
                 req->rq_transno = lcd->lcd_last_close_transno;
                 req->rq_status = lcd->lcd_last_close_result;
-                lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
-                lustre_msg_set_status(req->rq_repmsg, req->rq_status);
         } else {
                 req->rq_transno = lcd->lcd_last_transno;
                 req->rq_status = lcd->lcd_last_result;
-                lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
-                lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+                mdt_vbr_reconstruct(req, lcd);
         }
+        if (req->rq_status != 0)
+                req->rq_transno = 0;
+        lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+        lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+        DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d",
+                  req->rq_transno, req->rq_status);
+
         mdt_steal_ack_locks(req);
 }
 
index 4de1f39..26f923d 100644 (file)
@@ -96,6 +96,51 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
         RETURN(rc);
 }
 
+int mdt_version_get_check(struct mdt_thread_info *info, int index)
+{
+        /** version recovery */
+        struct md_object *mo;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        __u64 curr_version, *pre_versions;
+        ENTRY;
+
+        if (!exp_connect_vbr(req->rq_export))
+                RETURN(0);
+
+        LASSERT(info->mti_mos[index]);
+        LASSERT(mdt_object_exists(info->mti_mos[index]));
+        mo = mdt_object_child(info->mti_mos[index]);
+
+        curr_version = mo_version_get(info->mti_env, mo);
+        CDEBUG(D_INODE, "Version is "LPX64"\n", curr_version);
+        /** VBR: version is checked always because costs nothing */
+        if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
+                pre_versions = lustre_msg_get_versions(req->rq_reqmsg);
+                LASSERT(index < PTLRPC_NUM_VERSIONS);
+                /** Sanity check for malformed buffers */
+                if (pre_versions == NULL) {
+                        CERROR("No versions in request buffer\n");
+                        spin_lock(&req->rq_export->exp_lock);
+                        req->rq_export->exp_vbr_failed = 1;
+                        spin_unlock(&req->rq_export->exp_lock);
+                        RETURN(-EOVERFLOW);
+                } else if (pre_versions[index] != curr_version) {
+                        CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+                               pre_versions[index], curr_version);
+                        spin_lock(&req->rq_export->exp_lock);
+                        req->rq_export->exp_vbr_failed = 1;
+                        spin_unlock(&req->rq_export->exp_lock);
+                        RETURN(-EOVERFLOW);
+                }
+        }
+        /** save pre-versions in reply */
+        LASSERT(req->rq_repmsg != NULL);
+        pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+        if (pre_versions)
+                pre_versions[index] = curr_version;
+        RETURN(0);
+}
+
 static int mdt_md_create(struct mdt_thread_info *info)
 {
         struct mdt_device       *mdt = info->mti_mdt;
@@ -136,6 +181,12 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
+                info->mti_mos[0] = parent;
+                info->mti_mos[1] = child;
+                rc = mdt_version_get_check(info, 0);
+                if (rc)
+                        GOTO(out_put_child, rc);
+
                 /* Let lower layer know current lock mode. */
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
@@ -158,6 +209,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
                                                    mdt_object_fid(child));
                 }
+out_put_child:
                 mdt_object_put(info->mti_env, child);
         } else
                 rc = PTR_ERR(child);
@@ -227,6 +279,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
         struct md_attr          *ma = &info->mti_attr;
         struct mdt_lock_handle  *lh;
         int som_update = 0;
+        int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID);
         int rc;
         ENTRY;
 
@@ -271,6 +324,14 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
 
+        /* VBR: update version if attr changed are important for recovery */
+        if (do_vbr) {
+                info->mti_mos[0] = mo;
+                rc = mdt_version_get_check(info, 0);
+                if (rc)
+                        GOTO(out_unlock, rc);
+        }
+
         /* all attrs are packed into mti_attr in unpack_setattr */
         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
         if (rc != 0)
@@ -315,6 +376,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
+        /* start a log jounal handle if needed */
         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
                     (rr->rr_flags & MRF_SETATTR_LOCKED)) {
@@ -497,6 +559,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
 
+        info->mti_mos[0] = mp;
+        rc = mdt_version_get_check(info, 0);
+        if (rc)
+                GOTO(out_unlock_parent, rc);
+
         mdt_reint_init_ma(info, ma);
         if (!ma->ma_lmm || !ma->ma_cookie)
                 GOTO(out_unlock_parent, rc = -EINVAL);
@@ -542,6 +609,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
 
+        info->mti_mos[1] = mc;
+        rc = mdt_version_get_check(info, 1);
+        if (rc)
+                GOTO(out_unlock_child, rc);
+
         /*
          * Now we can only make sure we need MA_INODE, in mdd layer, will check
          * whether need MA_LOV and MA_COOKIE.
@@ -555,6 +627,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 mdt_handle_last_unlink(info, mc, ma);
 
         EXIT;
+out_unlock_child:
         mdt_object_unlock_put(info, mc, child_lh, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, parent_lh, rc);
@@ -614,6 +687,11 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
+        info->mti_mos[0] = mp;
+        rc = mdt_version_get_check(info, 0);
+        if (rc)
+                GOTO(out_unlock_parent, rc);
+
         /* step 2: find & lock the source */
         lhs = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(lhs, LCK_EX);
@@ -633,11 +711,17 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
+        info->mti_mos[1] = ms;
+        rc = mdt_version_get_check(info, 1);
+        if (rc)
+                GOTO(out_unlock_child, rc);
+
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), lname, ma);
 
         EXIT;
+out_unlock_child:
         mdt_object_unlock_put(info, ms, lhs, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, lhp, rc);
@@ -871,6 +955,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
+        info->mti_mos[0] = msrcdir;
+        rc = mdt_version_get_check(info, 0);
+        if (rc)
+                GOTO(out_unlock_source, rc);
+
         /* step 2: find & lock the target dir. */
         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
@@ -892,7 +981,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
                                              MDS_INODELOCK_UPDATE,
                                              MDT_LOCAL_LOCK);
-                        if (rc != 0)
+                        if (rc != 0) {
+                                mdt_object_put(info->mti_env, mtgtdir);
+                                GOTO(out_unlock_source, rc);
+                        }
+
+                        info->mti_mos[1] = mtgtdir;
+                        rc = mdt_version_get_check(info, 1);
+                        if (rc)
                                 GOTO(out_unlock_target, rc);
                 }
         }
@@ -920,6 +1016,12 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 mdt_object_put(info->mti_env, mold);
                 GOTO(out_unlock_target, rc);
         }
+
+        info->mti_mos[2] = mold;
+        rc = mdt_version_get_check(info, 2);
+        if (rc)
+                GOTO(out_unlock_old, rc);
+
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
 
         /* step 4: find & lock the new object. */
@@ -947,6 +1049,12 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         mdt_object_put(info->mti_env, mnew);
                         GOTO(out_unlock_old, rc);
                 }
+
+                info->mti_mos[3] = mnew;
+                rc = mdt_version_get_check(info, 3);
+                if (rc)
+                        GOTO(out_unlock_new, rc);
+
                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
         } else if (rc != -EREMOTE && rc != -ENOENT)
                 GOTO(out_unlock_old, rc);
index 8e646b2..ae04caf 100644 (file)
@@ -62,7 +62,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
         static const char       user_string[] = "user.";
         int                     size, rc;
         ENTRY;
-        
+
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK))
                 RETURN(-ENOMEM);
 
@@ -75,7 +75,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
                     !strncmp(xattr_name, user_string, sizeof(user_string) - 1))
                         RETURN(-EOPNOTSUPP);
-                
+
                 size = mo_xattr_get(info->mti_env,
                                     mdt_object_child(info->mti_object),
                                     &LU_BUF_NULL, xattr_name);
@@ -352,6 +352,11 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
+        info->mti_mos[0] = obj;
+        rc = mdt_version_get_check(info, 0);
+        if (rc)
+                GOTO(out_unlock, rc);
+
         if (unlikely(!(valid & OBD_MD_FLCTIME))) {
                 CWARN("client miss to set OBD_MD_FLCTIME when "
                       "setxattr: [object "DFID"] [valid %llu]\n",
index 79c160e..e5d3c4d 100644 (file)
@@ -97,7 +97,8 @@ int dt_txn_hook_start(const struct lu_env *env,
 
         result = 0;
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
-                if (cb->dtc_txn_start == NULL)
+                if (cb->dtc_txn_start == NULL ||
+                    !(cb->dtc_tag & env->le_ctx.lc_tags))
                         continue;
                 result = cb->dtc_txn_start(env, param, cb->dtc_cookie);
                 if (result < 0)
@@ -115,7 +116,8 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
 
         result = 0;
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
-                if (cb->dtc_txn_stop == NULL)
+                if (cb->dtc_txn_stop == NULL ||
+                    !(cb->dtc_tag & env->le_ctx.lc_tags))
                         continue;
                 result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
                 if (result < 0)
@@ -133,7 +135,8 @@ int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn)
 
         result = 0;
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
-                if (cb->dtc_txn_commit == NULL)
+                if (cb->dtc_txn_commit == NULL ||
+                    !(cb->dtc_tag & env->le_ctx.lc_tags))
                         continue;
                 result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie);
                 if (result < 0)
@@ -400,5 +403,38 @@ void dt_global_fini(void)
         lu_context_key_degister(&dt_key);
 }
 
+int dt_record_read(const struct lu_env *env, struct dt_object *dt,
+                   struct lu_buf *buf, loff_t *pos)
+{
+        int rc;
+
+        LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
+
+        rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
+
+        if (rc == buf->lb_len)
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+EXPORT_SYMBOL(dt_record_read);
+
+int dt_record_write(const struct lu_env *env, struct dt_object *dt,
+                    const struct lu_buf *buf, loff_t *pos, struct thandle *th)
+{
+        int rc;
+
+        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+        LASSERT(th != NULL);
+        rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
+        if (rc == buf->lb_len)
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+EXPORT_SYMBOL(dt_record_write);
+
 const struct dt_index_features dt_directory_features;
 EXPORT_SYMBOL(dt_directory_features);
index 615b2f9..b0ba327 100644 (file)
@@ -723,6 +723,7 @@ static void class_export_destroy(struct obd_export *exp)
                 ptlrpc_put_connection_superhack(exp->exp_connection);
 
         LASSERT(list_empty(&exp->exp_outstanding_replies));
+        LASSERT(list_empty(&exp->exp_uncommitted_replies));
         LASSERT(list_empty(&exp->exp_req_replay_queue));
         LASSERT(list_empty(&exp->exp_queued_rpc));
         obd_destroy_export(exp);
@@ -781,6 +782,8 @@ struct obd_export *class_new_export(struct obd_device *obd,
         atomic_set(&export->exp_rpc_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
+        spin_lock_init(&export->exp_uncommitted_replies_lock);
+        CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
@@ -837,6 +840,15 @@ void class_unlink_export(struct obd_export *exp)
         exp->exp_obd->obd_num_exports--;
         spin_unlock(&exp->exp_obd->obd_dev_lock);
 
+        /* Keep these counter valid always */
+        spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
+        if (exp->exp_delayed)
+                exp->exp_obd->obd_delayed_clients--;
+        else if (exp->exp_in_recovery)
+                exp->exp_obd->obd_recoverable_clients--;
+        else if (exp->exp_obd->obd_recovering)
+                exp->exp_obd->obd_max_recoverable_clients--;
+        spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
         class_export_put(exp);
 }
 EXPORT_SYMBOL(class_unlink_export);
@@ -1125,9 +1137,10 @@ void class_disconnect_exports(struct obd_device *obd)
         ENTRY;
 
         /* Move all of the exports from obd_exports to a work list, en masse. */
+        CFS_INIT_LIST_HEAD(&work_list);
         spin_lock(&obd->obd_dev_lock);
-        list_add(&work_list, &obd->obd_exports);
-        list_del_init(&obd->obd_exports);
+        list_splice_init(&obd->obd_exports, &work_list);
+        list_splice_init(&obd->obd_delayed_exports, &work_list);
         spin_unlock(&obd->obd_dev_lock);
 
         if (!list_empty(&work_list)) {
@@ -1161,8 +1174,7 @@ int class_disconnect_stale_exports(struct obd_device *obd,
                 if (test_export(exp))
                         continue;
 
-                list_del(&exp->exp_obd_chain);
-                list_add(&exp->exp_obd_chain, &work_list);
+                list_move(&exp->exp_obd_chain, &work_list);
                 /* don't count self-export as client */
                 if (obd_uuid_equals(&exp->exp_client_uuid,
                                      &exp->exp_obd->obd_uuid))
index 2178f9e..41fb346 100644 (file)
@@ -91,10 +91,10 @@ int __llog_ctxt_put(struct llog_ctxt *ctxt)
         }
         olg->olg_ctxts[ctxt->loc_idx] = NULL;
         spin_unlock(&olg->olg_lock);
-        
+
         if (ctxt->loc_lcm)
                 lcm_put(ctxt->loc_lcm);
-       
+
         obd = ctxt->loc_obd;
         spin_lock(&obd->obd_dev_lock);
         spin_unlock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */
index 74b94e4..f5d7316 100644 (file)
@@ -247,14 +247,15 @@ int class_attach(struct lustre_cfg *lcfg)
         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
                  "obd %p obd_magic %08X != %08X\n",
                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
-        LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n",
-                 obd, obd->obd_name, name);
+        LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0,
+                 "%p obd_name %s != %s\n", obd, obd->obd_name, name);
 
         rwlock_init(&obd->obd_pool_lock);
         obd->obd_pool_limit = 0;
         obd->obd_pool_slv = 0;
 
         CFS_INIT_LIST_HEAD(&obd->obd_exports);
+        CFS_INIT_LIST_HEAD(&obd->obd_delayed_exports);
         CFS_INIT_LIST_HEAD(&obd->obd_exports_timed);
         CFS_INIT_LIST_HEAD(&obd->obd_nid_stats);
         spin_lock_init(&obd->obd_nid_lock);
@@ -277,9 +278,6 @@ int class_attach(struct lustre_cfg *lcfg)
 
         llog_group_init(&obd->obd_olg, FILTER_GROUP_LLOG);
 
-        spin_lock_init(&obd->obd_uncommitted_replies_lock);
-        CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
-
         len = strlen(uuid);
         if (len >= sizeof(obd->obd_uuid)) {
                 CERROR("uuid must be < %d bytes long\n",
@@ -499,7 +497,7 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
         /* Leave this on forever */
         obd->obd_stopping = 1;
         spin_unlock(&obd->obd_dev_lock);
-        
+
         if (lcfg->lcfg_bufcount >= 2 && LUSTRE_CFG_BUFLEN(lcfg, 1) > 0) {
                 for (flag = lustre_cfg_string(lcfg, 1); *flag != 0; flag++)
                         switch (*flag) {
@@ -859,7 +857,7 @@ int class_process_config(struct lustre_cfg *lcfg)
                 ldlm_timeout = max(lcfg->lcfg_num, 1U);
                 if (ldlm_timeout >= obd_timeout)
                         ldlm_timeout = max(obd_timeout / 3, 1U);
-                
+
                 GOTO(out, err = 0);
         }
         case LCFG_SET_UPCALL: {
index 76e558f..51344ba 100644 (file)
@@ -87,12 +87,38 @@ cfs_mem_cache_t *ll_fmd_cachep;
 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
 {
-        obd_transno_commit_cb(obd, transno, error);
+        struct obd_export *exp = cb_data;
+        obd_transno_commit_cb(obd, transno, exp, error);
+}
+
+int filter_version_get_check(struct obd_export *exp,
+                             struct obd_trans_info *oti, struct inode *inode)
+{
+        __u64 curr_version;
+
+        if (inode == NULL || oti == NULL)
+                RETURN(0);
+
+        curr_version = fsfilt_get_version(exp->exp_obd, inode);
+        if ((__s64)curr_version == -EOPNOTSUPP)
+                RETURN(0);
+        /* VBR: version is checked always because costs nothing */
+        if (oti->oti_pre_version != 0 &&
+            oti->oti_pre_version != curr_version) {
+                CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+                       oti->oti_pre_version, curr_version);
+                spin_lock(&exp->exp_lock);
+                exp->exp_vbr_failed = 1;
+                spin_unlock(&exp->exp_lock);
+                RETURN (-EOVERFLOW);
+        }
+        oti->oti_pre_version = curr_version;
+        RETURN(0);
 }
 
 /* Assumes caller has already pushed us into the kernel context. */
-int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
-                          int rc, int force_sync)
+int filter_finish_transno(struct obd_export *exp, struct inode *inode,
+                          struct obd_trans_info *oti, int rc, int force_sync)
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
@@ -109,24 +135,28 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                 RETURN(rc);
 
         /* we don't allocate new transnos for replayed requests */
+        spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                spin_lock(&filter->fo_translock);
                 last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
                 filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
-                spin_unlock(&filter->fo_translock);
-                oti->oti_transno = last_rcvd;
         } else {
-                spin_lock(&filter->fo_translock);
                 last_rcvd = oti->oti_transno;
                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
                         filter->fo_fsd->lsd_last_transno =
                                 cpu_to_le64(last_rcvd);
+        }
+        oti->oti_transno = last_rcvd;
+        if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) {
                 spin_unlock(&filter->fo_translock);
+                LBUG();
         }
         lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
+        lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
+        lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
+        spin_unlock(&filter->fo_translock);
 
-        /* could get xid from oti, if it's ever needed */
-        lcd->lcd_last_xid = 0;
+        if (inode)
+                fsfilt_set_version(exp->exp_obd, inode, last_rcvd);
 
         off = fed->fed_lr_off;
         if (off <= 0) {
@@ -139,13 +169,13 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                                                            last_rcvd,
                                                            oti->oti_handle,
                                                            filter_commit_cb,
-                                                           NULL);
+                                                           exp);
 
                 err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
                                           lcd, sizeof(*lcd), &off,
                                           force_sync | exp->exp_need_sync);
                 if (force_sync)
-                        filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err);
+                        filter_commit_cb(exp->exp_obd, last_rcvd, exp, err);
         }
         if (err) {
                 log_pri = D_ERROR;
@@ -328,6 +358,9 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
+                        fed->fed_lcd->lcd_last_epoch =
+                                              filter->fo_fsd->lsd_start_epoch;
+                        exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb, exp);
                         if (rc == 0) {
@@ -618,7 +651,7 @@ static int filter_init_export(struct obd_export *exp)
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
-        OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
+        OBD_FREE_PTR(filter->fo_fsd);
         filter->fo_fsd = NULL;
         OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
         filter->fo_last_rcvd_slots = NULL;
@@ -639,7 +672,6 @@ int filter_update_server_data(struct obd_device *obd, struct file *filp,
         CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
                le64_to_cpu(fsd->lsd_mount_count));
 
-        fsd->lsd_compat14 = fsd->lsd_last_transno;
         rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off, force_sync);
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
@@ -683,6 +715,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
         __u64 mount_count;
+        __u32 start_epoch;
         int cl_idx;
         loff_t off = 0;
         int rc;
@@ -754,7 +787,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 GOTO(err_fsd, rc = -EINVAL);
         }
 
-        CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
+        start_epoch = le32_to_cpu(fsd->lsd_start_epoch);
+
+        CDEBUG(D_INODE, "%s: server start_epoch : %#x\n",
+               obd->obd_name, start_epoch);
+        CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n",
                obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
                obd->obd_name, mount_count + 1);
@@ -834,12 +871,16 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                         /* can't fail for existing client */
                         LASSERTF(rc == 0, "rc = %d\n", rc);
 
-                        lcd = NULL;
+                        /* VBR: set export last committed */
+                        exp->exp_last_committed = last_rcvd;
                         spin_lock(&exp->exp_lock);
                         exp->exp_connecting = 0;
                         exp->exp_in_recovery = 0;
                         spin_unlock(&exp->exp_lock);
+                        spin_lock_bh(&obd->obd_processing_task_lock);
                         obd->obd_max_recoverable_clients++;
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        lcd = NULL;
                         class_export_put(exp);
                 }
 
@@ -856,7 +897,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 
         obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno);
 
-        target_recovery_init(obd, ost_handle);
+        target_recovery_init(&filter->fo_lut, ost_handle);
 
 out:
         filter->fo_mount_count = mount_count + 1;
@@ -1308,6 +1349,9 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
+        /** lu_target has very limited use in filter now */
+        lut_init(NULL, &filter->fo_lut, obd, NULL);
+
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
@@ -2069,10 +2113,11 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         spin_lock_init(&filter->fo_llog_list_lock);
 
         filter->fo_fl_oss_capa = 1;
+
         CFS_INIT_LIST_HEAD(&filter->fo_capa_keys);
         filter->fo_capa_hash = init_capa_hash();
         if (filter->fo_capa_hash == NULL)
-                GOTO(err_ops, rc = -ENOMEM);
+                GOTO(err_post, rc = -ENOMEM);
 
         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
         obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER,
@@ -2584,6 +2629,8 @@ static int filter_precleanup(struct obd_device *obd,
         case OBD_CLEANUP_EARLY:
                 break;
         case OBD_CLEANUP_EXPORTS:
+                /* Stop recovery before namespace cleanup. */
+                target_stop_recovery_thread(obd);
                 target_cleanup_recovery(obd);
                 rc = filter_llog_preclean(obd);
                 break;
@@ -2615,10 +2662,6 @@ static int filter_cleanup(struct obd_device *obd)
         lprocfs_obd_cleanup(obd);
         lquota_cleanup(filter_quota_interface_ref, obd);
 
-        /* Stop recovery before namespace cleanup. */
-        target_stop_recovery_thread(obd);
-        target_cleanup_recovery(obd);
-
         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
         obd->obd_namespace = NULL;
 
@@ -3106,7 +3149,6 @@ static void filter_revimp_update(struct obd_export *exp)
 static int filter_ping(struct obd_export *exp)
 {
         filter_fmd_expire(exp);
-
         return 0;
 }
 
@@ -3246,6 +3288,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                 old_size = i_size_read(inode);
         }
 
+        /* VBR: version recovery check */
+        rc = filter_version_get_check(exp, oti, inode);
+        if (rc)
+                GOTO(out_unlock, rc);
+
         /* If the inode still has SUID+SGID bits set (see filter_precreate())
          * then we will accept the UID+GID sent by the client during write for
          * initializing the ownership of this inode.  We only allow this to
@@ -3308,7 +3355,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
          * sure we have one left for the last_rcvd update. */
         err = fsfilt_extend(exp->exp_obd, inode, 1, handle);
 
-        rc = filter_finish_transno(exp, oti, rc, sync);
+        rc = filter_finish_transno(exp, inode, oti, rc, sync);
         if (sync) {
                 filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc);
                 fcc = NULL;
@@ -4047,6 +4094,12 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
          * (see BUG 4180) -bzzz
          */
         LOCK_INODE_MUTEX(dchild->d_inode);
+
+        /* VBR: version recovery check */
+        rc = filter_version_get_check(exp, oti, dchild->d_inode);
+        if (rc)
+                GOTO(cleanup, rc);
+
         handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR,
                                   NULL, 1);
         if (IS_ERR(handle)) {
@@ -4103,7 +4156,7 @@ cleanup:
                  * on commit. then we call callback directly to free
                  * the fcc.
                  */
-                rc = filter_finish_transno(exp, oti, rc, sync);
+                rc = filter_finish_transno(exp, NULL, oti, rc, sync);
                 if (sync) {
                         filter_cancel_cookies_cb(obd, 0, fcc, rc);
                         fcc = NULL;
index 9cb6de0..dbe4fae 100644 (file)
@@ -40,7 +40,6 @@
 #ifdef __KERNEL__
 # include <linux/spinlock.h>
 #endif
-#include <lustre_disk.h>
 #include <lustre_handles.h>
 #include <lustre_debug.h>
 #include <obd.h>
@@ -127,7 +126,7 @@ enum {
 extern int *obdfilter_created_scratchpad;
 
 extern void target_recovery_fini(struct obd_device *obd);
-extern void target_recovery_init(struct obd_device *obd,
+extern void target_recovery_init(struct lu_target *lut,
                                  svc_handler_t handler);
 
 /* filter.c */
@@ -138,8 +137,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
                                   const char *what, int quiet);
 #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0)
 
-int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc,
-                          int force_sync);
+int filter_finish_transno(struct obd_export *, struct inode *,
+                          struct obd_trans_info *, int rc, int force_sync);
 __u64 filter_next_id(struct filter_obd *, struct obdo *);
 __u64 filter_last_id(struct filter_obd *, obd_gr group);
 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
index 95835a3..3bd68f6 100644 (file)
@@ -500,7 +500,7 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
 
                 UNLOCK_INODE_MUTEX(inode);
 
-                rc2 = filter_finish_transno(exp, oti, 0, 0);
+                rc2 = filter_finish_transno(exp, inode, oti, 0, 0);
                 if (rc2 != 0) {
                         CERROR("can't close transaction: %d\n", rc2);
                         if (rc == 0)
index a6c7e98..b8e1a2b 100644 (file)
@@ -2078,6 +2078,35 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
+/*
+ * Get the 64-bit version for an inode.
+ */
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+                                               struct dt_object *dt)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        return LDISKFS_I(inode)->i_fs_version;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+                                   dt_obj_version_t new_version)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        LDISKFS_I(inode)->i_fs_version = new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
+}
+
 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
                         void **data)
 {
@@ -2106,6 +2135,8 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
         .do_data_get     = osd_data_get,
 };
 
@@ -2131,6 +2162,8 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
         .do_data_get     = osd_data_get,
 };
 
index 3660c7f..a0ccb07 100644 (file)
@@ -13,7 +13,7 @@ ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
 ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
 ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
 ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
-ptlrpc_objs += sec_null.o sec_plain.o
+ptlrpc_objs += sec_null.o sec_plain.o target.o
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 
index d2a5cf9..375fb72 100644 (file)
@@ -940,6 +940,27 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
 }
 
 /**
+ * save pre-versions for replay
+ */
+static void ptlrpc_save_versions(struct ptlrpc_request *req)
+{
+        struct lustre_msg *repmsg = req->rq_repmsg;
+        struct lustre_msg *reqmsg = req->rq_reqmsg;
+        __u64 *versions = lustre_msg_get_versions(repmsg);
+        ENTRY;
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                return;
+
+        LASSERT(versions);
+        lustre_msg_set_versions(reqmsg, versions);
+        CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n",
+               versions[0], versions[1]);
+
+        EXIT;
+}
+
+/**
  * Callback function called when client receives RPC reply for \a req.
  */
 static int after_reply(struct ptlrpc_request *req)
@@ -1027,7 +1048,7 @@ static int after_reply(struct ptlrpc_request *req)
                 lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
         }
 
-        if (req->rq_import->imp_replayable) {
+        if (imp->imp_replayable) {
                 spin_lock(&imp->imp_lock);
                 /*
                  * No point in adding already-committed requests to the replay
@@ -1036,9 +1057,11 @@ static int after_reply(struct ptlrpc_request *req)
                 if (req->rq_transno != 0 &&
                     (req->rq_transno >
                      lustre_msg_get_last_committed(req->rq_repmsg) ||
-                     req->rq_replay))
+                     req->rq_replay)) {
+                        /** version recovery */
+                        ptlrpc_save_versions(req);
                         ptlrpc_retain_replayable_request(req, imp);
-                else if (req->rq_commit_cb != NULL) {
+                else if (req->rq_commit_cb != NULL) {
                         spin_unlock(&imp->imp_lock);
                         req->rq_commit_cb(req);
                         spin_lock(&imp->imp_lock);
@@ -2328,13 +2351,31 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
              lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
 
-        /* The transno had better not change over replay. */
-        LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
-                 lustre_msg_get_transno(req->rq_repmsg) ||
-                 lustre_msg_get_transno(req->rq_repmsg) == 0,
-                 LPX64"/"LPX64"\n",
-                 lustre_msg_get_transno(req->rq_reqmsg),
-                 lustre_msg_get_transno(req->rq_repmsg));
+        /** VBR: check version failure */
+        if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
+                /** replay was failed due to version mismatch */
+                DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+                spin_lock(&imp->imp_lock);
+                imp->imp_vbr_failed = 1;
+                imp->imp_no_lock_replay = 1;
+                spin_unlock(&imp->imp_lock);
+        } else {
+                /** The transno had better not change over replay. */
+                LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
+                         lustre_msg_get_transno(req->rq_repmsg) ||
+                         lustre_msg_get_transno(req->rq_repmsg) == 0,
+                         LPX64"/"LPX64"\n",
+                         lustre_msg_get_transno(req->rq_reqmsg),
+                         lustre_msg_get_transno(req->rq_repmsg));
+        }
+
+        spin_lock(&imp->imp_lock);
+        /** if replays by version then gap was occur on server, no trust to locks */
+        if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
+                imp->imp_no_lock_replay = 1;
+        imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        spin_unlock(&imp->imp_lock);
+        LASSERT(imp->imp_last_replay_transno);
 
         DEBUG_REQ(D_HA, req, "got rep");
 
index eef7b7e..baa4fb8 100644 (file)
@@ -1149,12 +1149,23 @@ static int completed_replay_interpret(const struct lu_env *env,
 {
         ENTRY;
         atomic_dec(&req->rq_import->imp_replay_inflight);
-        if (req->rq_status == 0) {
+        if (req->rq_status == 0 &&
+            !req->rq_import->imp_vbr_failed) {
                 ptlrpc_import_recovery_state_machine(req->rq_import);
         } else {
-                CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
-                       "reconnecting\n",
-                       req->rq_import->imp_obd->obd_name, req->rq_status);
+                if (req->rq_import->imp_vbr_failed) {
+                        CDEBUG(D_WARNING,
+                               "%s: version recovery fails, reconnecting\n",
+                               req->rq_import->imp_obd->obd_name);
+                        spin_lock(&req->rq_import->imp_lock);
+                        req->rq_import->imp_vbr_failed = 0;
+                        spin_unlock(&req->rq_import->imp_lock);
+                } else {
+                        CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
+                                     "reconnecting\n",
+                               req->rq_import->imp_obd->obd_name,
+                               req->rq_status);
+                }
                 ptlrpc_connect_import(req->rq_import, NULL);
         }
 
index 93168eb..51e109b 100644 (file)
@@ -774,6 +774,12 @@ void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
         return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
 }
 
+static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
+{
+        return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
+                                 sizeof(struct ptlrpc_body));
+}
+
 __u32 lustre_msghdr_get_flags(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
@@ -809,9 +815,7 @@ __u32 lustre_msg_get_flags(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -829,9 +833,7 @@ void lustre_msg_add_flags(struct lustre_msg *msg, int flags)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_flags |= flags;
                 return;
@@ -845,9 +847,7 @@ void lustre_msg_set_flags(struct lustre_msg *msg, int flags)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_flags = flags;
                 return;
@@ -862,9 +862,7 @@ void lustre_msg_clear_flags(struct lustre_msg *msg, int flags)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_flags &= ~(MSG_GEN_FLAG_MASK & flags);
                 return;
@@ -879,9 +877,7 @@ __u32 lustre_msg_get_op_flags(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -897,9 +893,7 @@ void lustre_msg_add_op_flags(struct lustre_msg *msg, int flags)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_op_flags |= flags;
                 return;
@@ -913,9 +907,7 @@ void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_op_flags |= flags;
                 return;
@@ -930,9 +922,7 @@ struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return NULL;
@@ -950,9 +940,7 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return PTL_RPC_MSG_ERR;
@@ -970,9 +958,7 @@ __u32 lustre_msg_get_version(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -990,9 +976,7 @@ void lustre_msg_add_version(struct lustre_msg *msg, int version)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_version |= version;
                 return;
@@ -1007,9 +991,7 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1027,9 +1009,7 @@ __u64 lustre_msg_get_last_xid(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1047,9 +1027,7 @@ __u64 lustre_msg_get_last_committed(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1062,14 +1040,31 @@ __u64 lustre_msg_get_last_committed(struct lustre_msg *msg)
         }
 }
 
+__u64 *lustre_msg_get_versions(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return NULL;
+        case LUSTRE_MSG_MAGIC_V2: {
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+                if (!pb) {
+                        CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+                        return NULL;
+                }
+                return pb->pb_pre_versions;
+        }
+        default:
+                CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+                return NULL;
+        }
+}
+
 __u64 lustre_msg_get_transno(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1087,9 +1082,7 @@ int lustre_msg_get_status(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return -EINVAL;
@@ -1108,9 +1101,7 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return -EINVAL;
@@ -1129,9 +1120,7 @@ void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return;
@@ -1150,9 +1139,7 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return -EINVAL;
@@ -1171,9 +1158,7 @@ void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return;
@@ -1192,9 +1177,7 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg)
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1238,9 +1221,7 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg)
                 return 0;
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1262,9 +1243,7 @@ __u32 lustre_msg_get_service_time(struct lustre_msg *msg)
                 return 0;
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 if (!pb) {
                         CERROR("invalid msg %p: no ptlrpc body!\n", msg);
                         return 0;
@@ -1301,8 +1280,7 @@ __u32 lustre_msg_calc_cksum(struct lustre_msg *msg)
                 return 0;
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED: {
-                struct ptlrpc_body *pb;
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 return crc32_le(~(__u32)0, (unsigned char *)pb, sizeof(*pb));
         }
@@ -1316,9 +1294,7 @@ void lustre_msg_set_handle(struct lustre_msg *msg, struct lustre_handle *handle)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_handle = *handle;
                 return;
@@ -1332,9 +1308,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_type = type;
                 return;
@@ -1348,9 +1322,7 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_opc = opc;
                 return;
@@ -1364,9 +1336,7 @@ void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_last_xid = last_xid;
                 return;
@@ -1380,9 +1350,7 @@ void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_last_committed = last_committed;
                 return;
@@ -1392,13 +1360,30 @@ void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed)
         }
 }
 
-void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno)
+void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions)
 {
         switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return;
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+                LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+                pb->pb_pre_versions[0] = versions[0];
+                pb->pb_pre_versions[1] = versions[1];
+                pb->pb_pre_versions[2] = versions[2];
+                pb->pb_pre_versions[3] = versions[3];
+                return;
+        }
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+        }
+}
 
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V2: {
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_transno = transno;
                 return;
@@ -1412,9 +1397,7 @@ void lustre_msg_set_status(struct lustre_msg *msg, __u32 status)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_status = status;
                 return;
@@ -1428,9 +1411,7 @@ void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_conn_cnt = conn_cnt;
                 return;
@@ -1446,9 +1427,7 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout)
         case LUSTRE_MSG_MAGIC_V1:
                 return;
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_timeout = timeout;
                 return;
@@ -1464,9 +1443,7 @@ void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time)
         case LUSTRE_MSG_MAGIC_V1:
                 return;
         case LUSTRE_MSG_MAGIC_V2: {
-                struct ptlrpc_body *pb;
-
-                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
                 LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
                 pb->pb_service_time = service_time;
                 return;
index daec0f5..dfcc33c 100644 (file)
@@ -124,6 +124,12 @@ void ptlrpc_ping_import_soon(struct obd_import *imp)
         imp->imp_next_ping = cfs_time_current();
 }
 
+static inline int imp_is_deactive(struct obd_import *imp)
+{
+        return (imp->imp_deactive ||
+                OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_IMP_DEACTIVE));
+}
+
 static inline int ptlrpc_next_reconnect(struct obd_import *imp)
 {
         if (imp->imp_server_timeout)
@@ -237,13 +243,13 @@ static void ptlrpc_pinger_process_import(struct obd_import *imp,
                              this_ping) && force == 0)
                 return;
 
-        if (level == LUSTRE_IMP_DISCON && !imp->imp_deactive) {
+        if (level == LUSTRE_IMP_DISCON && !imp_is_deactive(imp)) {
                 /* wait at least a timeout before trying recovery again */
                 imp->imp_next_ping = ptlrpc_next_reconnect(imp);
                 ptlrpc_initiate_recovery(imp);
         } else if (level != LUSTRE_IMP_FULL ||
                    imp->imp_obd->obd_no_recov ||
-                   imp->imp_deactive) {
+                   imp_is_deactive(imp)) {
                 CDEBUG(D_HA, "not pinging %s (in recovery "
                        " or recovery disabled: %s)\n",
                        obd2cli_tgt(imp->imp_obd),
@@ -939,11 +945,13 @@ void ptlrpc_pinger_wake_up()
                 CDEBUG(D_RPCTRACE, "checking import %s->%s\n",
                        imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
 #ifdef ENABLE_LIBLUSTRE_RECOVERY
-                if (imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive)
+                if (imp->imp_state == LUSTRE_IMP_DISCON &&
+                    !imp_is_deactive(imp))
 #else
                 /*XXX only recover for the initial connection */
                 if (!lustre_handle_is_used(&imp->imp_remote_handle) &&
-                    imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive)
+                    imp->imp_state == LUSTRE_IMP_DISCON &&
+                    !imp_is_deactive(imp))
 #endif
                         ptlrpc_initiate_recovery(imp);
                 else if (imp->imp_state != LUSTRE_IMP_FULL)
@@ -951,7 +959,7 @@ void ptlrpc_pinger_wake_up()
                                      "state %d, deactive %d\n",
                                      imp->imp_obd->obd_uuid.uuid,
                                      obd2cli_tgt(imp->imp_obd), imp->imp_state,
-                                     imp->imp_deactive);
+                                     imp_is_deactive(imp));
         }
         EXIT;
 #endif
index 681873d..7a9e4d5 100644 (file)
@@ -288,6 +288,7 @@ EXPORT_SYMBOL(lustre_msg_add_version);
 EXPORT_SYMBOL(lustre_msg_get_opc);
 EXPORT_SYMBOL(lustre_msg_get_last_xid);
 EXPORT_SYMBOL(lustre_msg_get_last_committed);
+EXPORT_SYMBOL(lustre_msg_get_versions);
 EXPORT_SYMBOL(lustre_msg_get_transno);
 EXPORT_SYMBOL(lustre_msg_get_status);
 EXPORT_SYMBOL(lustre_msg_get_slv);
@@ -302,6 +303,7 @@ EXPORT_SYMBOL(lustre_msg_set_type);
 EXPORT_SYMBOL(lustre_msg_set_opc);
 EXPORT_SYMBOL(lustre_msg_set_last_xid);
 EXPORT_SYMBOL(lustre_msg_set_last_committed);
+EXPORT_SYMBOL(lustre_msg_set_versions);
 EXPORT_SYMBOL(lustre_msg_set_transno);
 EXPORT_SYMBOL(lustre_msg_set_status);
 EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
index ef02ea7..3dad818 100644 (file)
@@ -336,7 +336,7 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
 static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
 {
         struct llog_canceld_ctxt *llcd;
-
+        LASSERT(ctxt);
         llcd = llcd_alloc(ctxt->loc_lcm);
         if (!llcd) {
                 CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
@@ -597,7 +597,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         }
         lcm = ctxt->loc_lcm;
         CDEBUG(D_INFO, "cancel on lsm %p\n", lcm);
-               
+
         /*
          * Let's check if we have all structures alive. We also check for
          * possible shutdown. Do nothing if we're stopping.
index 4adf785..7f27502 100644 (file)
@@ -116,17 +116,12 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
                    req and send it again. If, however, the last sent
                    transno has been committed then we continue replay
                    from the next request. */
-                if (imp->imp_resend_replay && 
-                    req->rq_transno == last_transno) {
-                        lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-                        break;
-                }
-
                 if (req->rq_transno > last_transno) {
-                        imp->imp_last_replay_transno = req->rq_transno;
+                        if (imp->imp_resend_replay)
+                                lustre_msg_add_flags(req->rq_reqmsg,
+                                                     MSG_RESENT);
                         break;
                 }
-
                 req = NULL;
         }
 
index 99f79a4..37dcbde 100644 (file)
@@ -354,11 +354,41 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         EXIT;
 }
 
-void
-ptlrpc_commit_replies (struct obd_device *obd)
+void ptlrpc_commit_replies_alt(struct obd_export *exp)
 {
-        struct list_head   *tmp;
-        struct list_head   *nxt;
+        struct ptlrpc_reply_state *rs, *nxt;
+        struct list_head committed_list;
+        DECLARE_RS_BATCH(batch);
+        ENTRY;
+
+        CFS_INIT_LIST_HEAD(&committed_list);
+        spin_lock(&exp->exp_uncommitted_replies_lock);
+        list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+                                 rs_obd_list) {
+                LASSERT (rs->rs_difficult);
+                LASSERT(rs->rs_export);
+                if (likely(rs->rs_transno <= exp->exp_last_committed))
+                        list_move(&rs->rs_obd_list, &committed_list);
+                else
+                        break;
+        }
+        spin_unlock(&exp->exp_uncommitted_replies_lock);
+
+        /* XXX: do we need this in context of commit callback? maybe separate thread
+         * should work this out */
+        rs_batch_init(&batch);
+        /* get replies that have been committed and get their service
+         * to attend to complete them. */
+        list_for_each_entry_safe(rs, nxt, &committed_list, rs_obd_list) {
+                list_del_init(&rs->rs_obd_list);
+                rs_batch_add(&batch, rs);
+        }
+        rs_batch_fini(&batch);
+        EXIT;
+}
+void ptlrpc_commit_replies(struct obd_export *exp)
+{
+        struct ptlrpc_reply_state *rs, *nxt;
         DECLARE_RS_BATCH(batch);
         ENTRY;
 
@@ -367,19 +397,18 @@ ptlrpc_commit_replies (struct obd_device *obd)
          * to attend to complete them. */
 
         /* CAVEAT EMPTOR: spinlock ordering!!! */
-        spin_lock(&obd->obd_uncommitted_replies_lock);
-        list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
-                struct ptlrpc_reply_state *rs =
-                        list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
-
+        spin_lock(&exp->exp_uncommitted_replies_lock);
+        list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+                                 rs_obd_list) {
                 LASSERT (rs->rs_difficult);
-
-                if (rs->rs_transno <= obd->obd_last_committed) {
+                /* VBR: per-export last_committed */
+                LASSERT(rs->rs_export);
+                if (rs->rs_transno <= exp->exp_last_committed) {
                         list_del_init(&rs->rs_obd_list);
                         rs_batch_add(&batch, rs);
                 }
         }
-        spin_unlock(&obd->obd_uncommitted_replies_lock);
+        spin_unlock(&exp->exp_uncommitted_replies_lock);
         rs_batch_fini(&batch);
         EXIT;
 }
@@ -532,14 +561,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         array->paa_count = 0;
         array->paa_deadline = -1;
 
-        /* allocate memory for srv_at_array (ptlrpc_at_array) */ 
+        /* allocate memory for srv_at_array (ptlrpc_at_array) */
         OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
         if (array->paa_reqs_array == NULL)
                 GOTO(failed, NULL);
 
         for (index = 0; index < size; index++)
                 CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
-        
+
         OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
         if (array->paa_reqs_count == NULL)
                 GOTO(failed, NULL);
@@ -706,8 +735,8 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
         if (req->rq_at_linked) {
                 struct ptlrpc_at_array *array = &svc->srv_at_array;
                 __u32 index = req->rq_at_index;
-        
-                req->rq_at_linked = 0;        
+
+                req->rq_at_linked = 0;
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
         }
@@ -1096,7 +1125,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                                 rq->rq_at_linked = 0;
                                 continue;
                         }
-                        
+
                         /* update the earliest deadline */
                         if (deadline == -1 || rq->rq_deadline < deadline)
                                 deadline = rq->rq_deadline;
@@ -1674,12 +1703,12 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
         list_del_init (&rs->rs_exp_list);
         spin_unlock (&exp->exp_lock);
 
-        /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that
+        /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that
          * rs has been removed from the list already */
         if (!list_empty_careful(&rs->rs_obd_list)) {
-                spin_lock(&obd->obd_uncommitted_replies_lock);
+                spin_lock(&exp->exp_uncommitted_replies_lock);
                 list_del_init(&rs->rs_obd_list);
-                spin_unlock(&obd->obd_uncommitted_replies_lock);
+                spin_unlock(&exp->exp_uncommitted_replies_lock);
         }
 
         spin_lock(&rs->rs_lock);
@@ -2482,17 +2511,17 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_timer_disarm(&service->srv_at_timer);
 
         if (array->paa_reqs_array != NULL) {
-                OBD_FREE(array->paa_reqs_array, 
+                OBD_FREE(array->paa_reqs_array,
                          sizeof(struct list_head) * array->paa_size);
                 array->paa_reqs_array = NULL;
         }
-        
+
         if (array->paa_reqs_count != NULL) {
-                OBD_FREE(array->paa_reqs_count, 
+                OBD_FREE(array->paa_reqs_count,
                          sizeof(__u32) * array->paa_size);
                 array->paa_reqs_count= NULL;
         }
-       
+
         OBD_FREE_PTR(service);
         RETURN(0);
 }
diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c
new file mode 100644 (file)
index 0000000..0be3950
--- /dev/null
@@ -0,0 +1,365 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * Lustre Common Target
+ * These are common function for MDT and OST recovery-related functionality
+ *
+ *   Author: Mikhail Pershin <tappro@sun.com>
+ */
+
+#include <obd.h>
+#include <lustre_fsfilt.h>
+/**
+ * Update client data in last_rcvd file. An obd API
+ */
+static int obt_client_data_update(struct obd_export *exp)
+{
+        struct lu_export_data *led = &exp->exp_target_data;
+        struct obd_device_target *obt = &exp->exp_obd->u.obt;
+        loff_t off = led->led_lr_off;
+        int rc = 0;
+
+        rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
+                                 led->led_lcd, sizeof(*led->led_lcd), &off, 0);
+
+        CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+               led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
+               le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
+
+        return rc;
+}
+
+/**
+ * Update server data in last_rcvd file. An obd API
+ */
+int obt_server_data_update(struct obd_device *obd, int force_sync)
+{
+        struct obd_device_target *obt = &obd->u.obt;
+        loff_t off = 0;
+        int rc;
+        ENTRY;
+
+        CDEBUG(D_SUPER,
+               "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+               obt->obt_lsd->lsd_uuid,
+               le64_to_cpu(obt->obt_lsd->lsd_mount_count),
+               le64_to_cpu(obt->obt_lsd->lsd_last_transno));
+
+        rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
+                                 sizeof(*obt->obt_lsd), &off, force_sync);
+        if (rc)
+                CERROR("error writing lr_server_data: rc = %d\n", rc);
+
+        RETURN(rc);
+}
+
+/**
+ * Update client epoch with server's one
+ */
+void obt_client_epoch_update(struct obd_export *exp)
+{
+        struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+        struct obd_device_target *obt = &exp->exp_obd->u.obt;
+
+        /** VBR: set client last_epoch to current epoch */
+        if (le32_to_cpu(lcd->lcd_last_epoch) >=
+            le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
+                return;
+        lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
+        obt_client_data_update(exp);
+}
+
+/**
+ * Increment server epoch. An obd API
+ */
+static void obt_boot_epoch_update(struct obd_device *obd)
+{
+        __u32 start_epoch;
+        struct obd_device_target *obt = &obd->u.obt;
+        struct ptlrpc_request *req;
+        struct list_head client_list;
+
+        spin_lock(&obt->obt_translock);
+        start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
+        obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
+                                            LR_EPOCH_BITS);
+        obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
+        spin_unlock(&obt->obt_translock);
+
+        CFS_INIT_LIST_HEAD(&client_list);
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        list_splice_init(&obd->obd_final_req_queue, &client_list);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /**
+         * go through list of exports participated in recovery and
+         * set new epoch for them
+         */
+        list_for_each_entry(req, &client_list, rq_list) {
+                LASSERT(!req->rq_export->exp_delayed);
+                obt_client_epoch_update(req->rq_export);
+        }
+        /** return list back at once */
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        list_splice_init(&client_list, &obd->obd_final_req_queue);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        obt_server_data_update(obd, 1);
+}
+
+/**
+ * write data in last_rcvd file.
+ */
+static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
+                               const struct lu_buf *buf, loff_t *off, int sync)
+{
+        struct thandle *th;
+        struct txn_param p;
+        int rc, credits;
+        ENTRY;
+
+        credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
+                                                         DTO_WRITE_BLOCK);
+        txn_param_init(&p, credits);
+
+        th = dt_trans_start(env, lut->lut_bottom, &p);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
+        dt_trans_stop(env, lut->lut_bottom, th);
+
+        CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
+               "uuid = %s\nlast_transno = "LPU64"\n",
+               rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
+
+        RETURN(rc);
+}
+
+/**
+ * Update client data in last_rcvd
+ */
+int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
+                            struct obd_export *exp)
+{
+        struct lu_export_data *led = &exp->exp_target_data;
+        struct lsd_client_data tmp_lcd;
+        loff_t tmp_off = led->led_lr_off;
+        struct lu_buf tmp_buf = {
+                                        .lb_buf = &tmp_lcd,
+                                        .lb_len = sizeof(tmp_lcd)
+                                };
+        int rc = 0;
+
+        lcd_cpu_to_le(led->led_lcd, &tmp_lcd);
+        LASSERT(lut->lut_last_rcvd);
+        rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
+
+        return rc;
+}
+
+/**
+ * Update server data in last_rcvd
+ */
+static int lut_server_data_update(const struct lu_env *env,
+                                  struct lu_target *lut, int sync)
+{
+        struct lr_server_data tmp_lsd;
+        loff_t tmp_off = 0;
+        struct lu_buf tmp_buf = {
+                                        .lb_buf = &tmp_lsd,
+                                        .lb_len = sizeof(tmp_lsd)
+                                };
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_SUPER,
+               "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+               lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
+               lut->lut_last_transno);
+
+        spin_lock(&lut->lut_translock);
+        lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
+        spin_unlock(&lut->lut_translock);
+
+        lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
+        if (lut->lut_last_rcvd != NULL)
+                rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
+        RETURN(rc);
+}
+
+void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
+                             struct obd_export *exp)
+{
+        struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+
+        LASSERT(lut->lut_bottom);
+        /** VBR: set client last_epoch to current epoch */
+        if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
+                return;
+        lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
+        lut_client_data_update(env, lut, exp);
+}
+
+/**
+ * Update boot epoch when recovery ends
+ */
+void lut_boot_epoch_update(struct lu_target *lut)
+{
+        struct lu_env env;
+        struct ptlrpc_request *req;
+        __u32 start_epoch;
+        struct list_head client_list;
+        int rc;
+
+        if (lut->lut_obd->obd_stopping)
+                return;
+        /** Increase server epoch after recovery */
+        if (lut->lut_bottom == NULL)
+                return obt_boot_epoch_update(lut->lut_obd);
+
+        rc = lu_env_init(&env, LCT_DT_THREAD);
+        if (rc) {
+                CERROR("Can't initialize environment rc=%i\n", rc);
+                return;
+        }
+
+        spin_lock(&lut->lut_translock);
+        start_epoch = lr_epoch(lut->lut_last_transno) + 1;
+        lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
+        lut->lut_lsd.lsd_start_epoch = start_epoch;
+        spin_unlock(&lut->lut_translock);
+
+        CFS_INIT_LIST_HEAD(&client_list);
+        /**
+         * The recovery is not yet finished and final queue can still be updated
+         * with resend requests. Move final list to separate one for processing
+         */
+        spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+        list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
+        spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+
+        /**
+         * go through list of exports participated in recovery and
+         * set new epoch for them
+         */
+        list_for_each_entry(req, &client_list, rq_list) {
+                LASSERT(!req->rq_export->exp_delayed);
+                lut_client_epoch_update(&env, lut, req->rq_export);
+        }
+        /** return list back at once */
+        spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+        list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
+        spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+        /** update server epoch */
+        lut_server_data_update(&env, lut, 1);
+        lu_env_fini(&env);
+}
+EXPORT_SYMBOL(lut_boot_epoch_update);
+
+/**
+ * commit callback, need to update last_commited value
+ */
+void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
+                           void *data, int err)
+{
+        struct obd_export *exp = data;
+
+        spin_lock(&lut->lut_translock);
+        if (transno > lut->lut_obd->obd_last_committed)
+                lut->lut_obd->obd_last_committed = transno;
+
+        LASSERT(exp);
+        if (!lut->lut_obd->obd_stopping &&
+            transno > exp->exp_last_committed) {
+                exp->exp_last_committed = transno;
+                spin_unlock(&lut->lut_translock);
+                ptlrpc_commit_replies(exp);
+        } else {
+                spin_unlock(&lut->lut_translock);
+        }
+        if (transno)
+                CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
+                       lut->lut_obd->obd_name, transno);
+}
+EXPORT_SYMBOL(lut_cb_last_committed);
+
+void lut_cb_client(struct lu_target *lut, __u64 transno,
+                       void *data, int err)
+{
+        LASSERT(lut->lut_obd);
+        target_client_add_cb(lut->lut_obd, transno, data, err);
+}
+EXPORT_SYMBOL(lut_cb_client);
+
+int lut_init(const struct lu_env *env, struct lu_target *lut,
+             struct obd_device *obd, struct dt_device *dt)
+{
+        struct lu_fid fid;
+        struct dt_object *o;
+        int rc = 0;
+        ENTRY;
+
+        lut->lut_obd = obd;
+        lut->lut_bottom = dt;
+        lut->lut_last_rcvd = NULL;
+
+        spin_lock_init(&lut->lut_translock);
+        spin_lock_init(&lut->lut_client_bitmap_lock);
+        spin_lock_init(&lut->lut_trans_table_lock);
+
+        /** obdfilter has no lu_device stack yet */
+        if (dt == NULL)
+                RETURN(rc);
+        o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
+        if (!IS_ERR(o)) {
+                lut->lut_last_rcvd = o;
+        } else {
+                rc = PTR_ERR(o);
+                CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+        }
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(lut_init);
+
+void lut_fini(const struct lu_env *env, struct lu_target *lut)
+{
+        ENTRY;
+        if (lut->lut_last_rcvd)
+                lu_object_put(env, &lut->lut_last_rcvd->do_lu);
+        lut->lut_last_rcvd = NULL;
+        EXIT;
+}
+EXPORT_SYMBOL(lut_fini);
index c04abbe..ef3c783 100644 (file)
@@ -399,6 +399,11 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+        CLASSERT(PTLRPC_NUM_VERSIONS == 4);
+        LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n",
+                 (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
+        LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n",
+                 (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]));
 
         /* Checks for struct obd_connect_data */
         LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
@@ -481,6 +486,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+        CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL);
         CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
 
         /* Checks for struct obdo */
index 884d9fe..a3711d8 100644 (file)
@@ -11,7 +11,7 @@ noinst_SCRIPTS += sanity.sh rundbench acceptance-small.sh compile.sh
 noinst_SCRIPTS += conf-sanity.sh insanity.sh lfscktest.sh oos.sh oos2.sh
 noinst_SCRIPTS += llog-test.sh recovery-small.sh replay-dual.sh sanity-quota.sh
 noinst_SCRIPTS += replay-ost-single.sh replay-single.sh run-llog.sh sanityN.sh
-noinst_SCRIPTS += runracer
+noinst_SCRIPTS += runracer replay-vbr.sh
 noinst_SCRIPTS += performance-sanity.sh mdsrate-create-small.sh
 noinst_SCRIPTS += mdsrate-create-large.sh mdsrate-lookup-1dir.sh
 noinst_SCRIPTS += mdsrate-stat-small.sh mdsrate-stat-large.sh
index fc79347..3fd3517 100755 (executable)
@@ -1459,19 +1459,19 @@ run_test 60 "test llog post recovery init vs llog unlink"
 #test race  llog recovery thread vs llog cleanup
 test_61a() {   # was test_61
     remote_ost_nodsh && skip "remote OST with nodsh" && return 0
-    
+
     mkdir -p $DIR/$tdir
     createmany -o $DIR/$tdir/$tfile-%d 800
-    replay_barrier ost1 
-#   OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221 
-    unlinkmany $DIR/$tdir/$tfile-%d 800 
+    replay_barrier ost1
+#   OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+    unlinkmany $DIR/$tdir/$tfile-%d 800
     set_nodes_failloc "$(osts_nodes)" 0x80000221
     facet_failover ost1
-    sleep 10 
+    sleep 10
     fail ost1
     sleep 30
     set_nodes_failloc "$(osts_nodes)" 0x0
-    
+
     $CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1
     rmdir $DIR/$tdir
 }
@@ -1481,7 +1481,7 @@ run_test 61a "test race llog recovery vs llog cleanup"
 test_61b() {
 #   OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x13a
     do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000013a"
-    facet_failover $SINGLEMDS 
+    facet_failover $SINGLEMDS
     sleep 10
     fail $SINGLEMDS
     do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1
@@ -1492,10 +1492,10 @@ run_test 61b "test race mds llog sync vs llog cleanup"
 test_61c() {
     remote_ost_nodsh && skip "remote OST with nodsh" && return 0
 
-#   OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222 
-    touch $DIR/$tfile 
+#   OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
+    touch $DIR/$tfile
     set_nodes_failloc "$(osts_nodes)" 0x80000222
-    rm $DIR/$tfile    
+    rm $DIR/$tfile
     sleep 10
     fail ost1
     set_nodes_failloc "$(osts_nodes)" 0x0
@@ -1850,6 +1850,54 @@ test_70b () {
 run_test 70b "mds recovery; $CLIENTCOUNT clients"
 # end multi-client tests
 
+test_73a() {
+    multiop_bg_pause $DIR/$tfile O_tSc || return 3
+    pid=$!
+    rm -f $DIR/$tfile
+
+    replay_barrier $SINGLEMDS
+#define OBD_FAIL_LDLM_ENQUEUE       0x302
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000302"
+    fail $SINGLEMDS
+    kill -USR1 $pid
+    wait $pid || return 1
+    [ -e $DIR/$tfile ] && return 2
+    return 0
+}
+run_test 73a "open(O_CREAT), unlink, replay, reconnect before open replay , close"
+
+test_73b() {
+    multiop_bg_pause $DIR/$tfile O_tSc || return 3
+    pid=$!
+    rm -f $DIR/$tfile
+
+    replay_barrier $SINGLEMDS
+#define OBD_FAIL_LDLM_REPLY       0x30c
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000030c"
+    fail $SINGLEMDS
+    kill -USR1 $pid
+    wait $pid || return 1
+    [ -e $DIR/$tfile ] && return 2
+    return 0
+}
+run_test 73b "open(O_CREAT), unlink, replay, reconnect at open_replay reply, close"
+
+test_73c() {
+    multiop_bg_pause $DIR/$tfile O_tSc || return 3
+    pid=$!
+    rm -f $DIR/$tfile
+
+    replay_barrier $SINGLEMDS
+#define OBD_FAIL_TGT_LAST_REPLAY       0x710
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000710"
+    fail $SINGLEMDS
+    kill -USR1 $pid
+    wait $pid || return 1
+    [ -e $DIR/$tfile ] && return 2
+    return 0
+}
+run_test 73c "open(O_CREAT), unlink, replay, reconnect at last_replay, close"
+
 test_80a() {
     [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
 
diff --git a/lustre/tests/replay-vbr.sh b/lustre/tests/replay-vbr.sh
new file mode 100644 (file)
index 0000000..20b5e08
--- /dev/null
@@ -0,0 +1,701 @@
+#!/bin/bash
+
+set -e
+
+# bug number:  16356
+ALWAYS_EXCEPT="2     $REPLAY_VBR_EXCEPT"
+
+SAVE_PWD=$PWD
+PTLDEBUG=${PTLDEBUG:--1}
+LUSTRE=${LUSTRE:-`dirname $0`/..}
+SETUP=${SETUP:-""}
+CLEANUP=${CLEANUP:-""}
+. $LUSTRE/tests/test-framework.sh
+
+init_test_env $@
+
+. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
+
+[ -n "$CLIENTS" ] || { skip "Need two or more clients" && exit 0; }
+[ $CLIENTCOUNT -ge 2 ] || \
+    { skip "Need two or more clients, have $CLIENTCOUNT" && exit 0; }
+remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0
+
+[ "$SLOW" = "no" ] && EXCEPT_SLOW=""
+
+
+[ ! "$NAME" = "ncli" ] && ALWAYS_EXCEPT="$ALWAYS_EXCEPT"
+[ "$NAME" = "ncli" ] && MOUNT_2=""
+MOUNT_2=""
+build_test_filter
+
+check_and_setup_lustre
+rm -rf $DIR/[df][0-9]*
+
+[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
+
+[ "$CLIENTS" ] && zconf_umount_clients $CLIENTS $DIR
+
+test_1() {
+    echo "mount client $CLIENT1,$CLIENT2..."
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 1
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+    zconf_umount $CLIENT2 $DIR
+
+    facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
+    do_node $CLIENT1 df $DIR || return 1
+
+    # All 50 files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+    [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists"
+
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 1 "lost client doesn't affect another during replay"
+
+test_2() {
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT2 mcreate $DIR/$tdir/$tfile
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    #client1 read data from client2 which will be lost
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+    zconf_umount $CLIENT2 $DIR
+
+    facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
+    do_node $CLIENT1 df $DIR || return 1
+
+    # All 50 files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile && return 4
+
+    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 2 "lost data due to missed REMOTE client during replay"
+
+test_3a() {
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    #make sure the time will change
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+    sleep 1
+    replay_barrier $SINGLEMDS
+    #change time
+    do_node $CLIENT2 touch $DIR/$tfile
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+    #another change
+    do_node $CLIENT1 touch $DIR/$tfile
+    #remove file
+    do_node $CLIENT1 rm $DIR/$tfile
+    zconf_umount $CLIENT2 $DIR
+
+    facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
+    do_node $CLIENT1 df $DIR || return 1
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile && return 2
+
+    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+    zconf_umount_clients $CLIENTS $DIR
+
+    return 0
+}
+run_test 3a "setattr of time/size doesn't change version"
+
+test_3b() {
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    #make sure the time will change
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
+
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+    sleep 1
+    replay_barrier $SINGLEMDS
+    #change mode
+    do_node $CLIENT2 chmod +x $DIR/$tfile
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+    #abother chmod
+    do_node $CLIENT1 chmod -x $DIR/$tfile
+    zconf_umount $CLIENT2 $DIR
+
+    facet_failover $SINGLEMDS
+    # recovery should fail due to missing client 2
+    do_node $CLIENT1 df $DIR && return 1
+
+    do_node $CLIENT1 $CHECKSTAT -p 755 $DIR/$tfile && return 2
+    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+    zconf_umount_clients $CLIENTS $DIR
+
+    return 0
+}
+run_test 3b "setattr of permissions changes version"
+
+vbr_deactivate_client() {
+    local client=$1
+    echo "Deactivating client $client";
+    do_node $client "sysctl -w lustre.fail_loc=0x50d"
+}
+
+vbr_activate_client() {
+    local client=$1
+    echo "Activating client $client";
+    do_node $client "sysctl -w lustre.fail_loc=0x0"
+}
+
+remote_server ()
+{
+    local client=$1
+    [ -z "$(do_node $client lctl dl | grep mdt)" ] && \
+    [ -z "$(do_node $client lctl dl | grep ost)" ]
+}
+
+test_4a() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+
+    # All 50 files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 4
+    # All 25 files from client2 should have been replayed
+    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 4a "fail MDS, delayed recovery"
+
+test_4b(){
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+
+    # create another set of files
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 2
+
+    # All files from should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
+    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+    zconf_umount_clients $CLIENTS $DIR
+}
+run_test 4b "fail MDS, normal operation, delayed open recovery"
+
+test_4c() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -m $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -m $DIR/$tdir/$tfile-2- 25
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+
+    # create another set of files
+    do_node $CLIENT1 createmany -m $DIR/$tfile-3- 25
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 2
+
+    # All files from should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
+    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+    zconf_umount_clients $CLIENTS $DIR
+}
+run_test 4c "fail MDS, normal operation, delayed recovery"
+
+test_5a() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 1
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR && return 1
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 2
+
+    # First 25 files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+    # Third file is failed due to missed client2
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-3-0 && error "$tfile-3-0 exists"
+    # file from client2 should exists
+    do_node $CLIENT2 unlinkmany $DIR/$tfile-2- 1 || return 4
+
+    zconf_umount_clients $CLIENTS $DIR
+}
+run_test 5a "fail MDS, delayed recovery should fail"
+
+test_5b() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
+
+    # create another set of files
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR && return 4
+    # file from client2 should fail
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
+
+    # All 50 files from client 1 should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+    zconf_umount_clients $CLIENTS $DIR
+}
+run_test 5b "fail MDS, normal operation, delayed recovery should fail"
+
+test_6a() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    # replay only 5 requests
+    do_node $CLIENT2 "sysctl -w lustre.fail_val=5"
+#define OBD_FAIL_PTLRPC_REPLAY        0x50e
+    do_node $CLIENT2 "sysctl -w lustre.fail_loc=0x2000050e"
+    do_node $CLIENT2 df $DIR
+    # vbr_activate_client $CLIENT2
+    # need way to know that client stops replays
+    sleep 5
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+
+    # All files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 6a "fail MDS, delayed recovery, fail MDS"
+
+test_7a() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+    vbr_deactivate_client $CLIENT2
+
+    facet_failover $SINGLEMDS
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 4
+
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 1
+
+    # All files should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 7a "fail MDS, delayed recovery, fail MDS"
+
+rmultiop_start() {
+    local client=$1
+    local file=$2
+
+    # We need to run do_node in bg, because pdsh does not exit
+    # if child process of run script exists.
+    # I.e. pdsh does not exit when runmultiop_bg_pause exited,
+    # because of multiop_bg_pause -> $MULTIOP_PROG &
+    # By the same reason we need sleep a bit after do_nodes starts 
+    # to let runmultiop_bg_pause start muliop and
+    # update /tmp/multiop_bg.pid ;
+    # The rm /tmp/multiop_bg.pid guarantees here that 
+    # we have the updated by runmultiop_bg_pause
+    # /tmp/multiop_bg.pid file
+
+    local pid_file=$TMP/multiop_bg.pid.$$
+    do_node $client "rm -f $pid_file && MULTIOP_PID_FILE=$pid_file LUSTRE= runmultiop_bg_pause $file O_tSc" & 
+    local pid=$!
+    sleep 3
+    local multiop_pid
+    multiop_pid=$(do_node $client cat $pid_file)
+    [ -n "$multiop_pid" ] || error "$client : Can not get multiop_pid from $pid_file "
+    eval export ${client}_multiop_pid=$multiop_pid
+    eval export ${client}_do_node_pid=$pid
+    local var=${client}_multiop_pid
+    echo client $client multiop_bg started multiop_pid=${!var}
+    return $?
+}
+
+rmultiop_stop() {
+    local client=$1
+    local multiop_pid=${client}_multiop_pid
+    local do_node_pid=${client}_do_node_pid
+
+    echo "Stopping multiop_pid=${!multiop_pid} (kill ${!multiop_pid} on $client)"
+    do_node $client kill -USR1 ${!multiop_pid}
+
+    wait ${!do_node_pid} || true
+}
+
+test_8a() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+    do_node $CLIENT2 rm -f $DIR/$tfile
+    replay_barrier $SINGLEMDS
+    rmultiop_stop $CLIENT2 || return 2
+
+    vbr_deactivate_client $CLIENT2
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 3
+    #client1 is back and will try to open orphan
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 4
+
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 8a "orphans are kept until delayed recovery"
+
+test_8b() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 rm -f $DIR/$tfile
+
+    vbr_deactivate_client $CLIENT2
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 2
+    #client1 is back and will try to open orphan
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 3
+
+    rmultiop_stop $CLIENT2 || return 1
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 8b "open1 | unlink2 X delayed_replay1, close1"
+
+test_8c() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 rm -f $DIR/$tfile
+    rmultiop_stop $CLIENT2 || return 2
+
+    vbr_deactivate_client $CLIENT2
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 3
+    #client1 is back and will try to open orphan
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 4
+
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 8c "open1 | unlink2, close1 X delayed_replay1"
+
+test_8d() {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+    [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+    remote_server $CLIENT2 || \
+        { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    rmultiop_start $CLIENT1 $DIR/$tfile || return 1
+    rmultiop_start $CLIENT2 $DIR/$tfile || return 2
+    replay_barrier $SINGLEMDS
+    do_node $CLIENT1 rm -f $DIR/$tfile
+    rmultiop_stop $CLIENT2 || return 3
+    rmultiop_stop $CLIENT1 || return 4
+
+    vbr_deactivate_client $CLIENT2
+    facet_failover $SINGLEMDS
+    do_node $CLIENT1 df $DIR || return 6
+
+    #client1 is back and will try to open orphan
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 8
+
+    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 8d "open1, open2 | unlink2, close1, close2 X delayed_replay1"
+
+test_8e() {
+    zconf_mount $CLIENT1 $DIR
+    zconf_mount $CLIENT2 $DIR
+
+    do_node $CLIENT1 mcreate $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
+    replay_barrier $SINGLEMDS
+    # missed replay from client1 will lead to recovery by versions
+    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+    do_node $CLIENT2 rm $DIR/$tfile || return 1
+    do_node $CLIENT2 touch $DIR/$tfile || return 2
+
+    zconf_umount $CLIENT1 $DIR
+    facet_failover $SINGLEMDS
+    do_node $CLIENT2 df $DIR || return 6
+
+    do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
+    zconf_umount_clients $CLIENTS $DIR
+    return 0
+}
+run_test 8e "create | unlink, create shouldn't fail"
+
+test_8f() {
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
+    replay_barrier $SINGLEMDS
+    # missed replay from client1 will lead to recovery by versions
+    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+    do_node $CLIENT2 rm -f $DIR/$tfile || return 1
+    do_node $CLIENT2 mcreate $DIR/$tfile || return 2
+
+    zconf_umount $CLIENT1 $DIR
+    facet_failover $SINGLEMDS
+    do_node $CLIENT2 df $DIR || return 6
+
+    do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
+    zconf_umount $CLIENT2 $DIR
+    return 0
+}
+run_test 8f "create | unlink, create shouldn't fail"
+
+test_8g() {
+    zconf_mount_clients $CLIENT1 $DIR
+    zconf_mount_clients $CLIENT2 $DIR
+
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
+    replay_barrier $SINGLEMDS
+    # missed replay from client1 will lead to recovery by versions
+    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+    do_node $CLIENT2 rm -f $DIR/$tfile || return 1
+    do_node $CLIENT2 mkdir $DIR/$tfile || return 2
+
+    zconf_umount $CLIENT1 $DIR
+    facet_failover $SINGLEMDS
+    do_node $CLIENT2 df $DIR || return 6
+
+    do_node $CLIENT2 rmdir $DIR/$tfile || error "$tfile doesn't exists"
+    zconf_umount $CLIENT2 $DIR
+    return 0
+}
+run_test 8g "create | unlink, create shouldn't fail"
+
+test_10 () {
+    local var=${SINGLEMDS}_svc
+    do_facet $SINGLEMDS $LCTL get_param -n mdd.${!var}.stale_export_age && \
+        { skip "No delayed recovery support" && return; }
+    [ -z "$DBENCH_LIB" ] && skip "DBENCH_LIB is not set" && return 0
+
+    zconf_mount_clients $CLIENTS $DIR
+
+    local duration="-t 60"
+    local cmd="rundbench 1 $duration "
+    local PID=""
+    for CLIENT in ${CLIENTS//,/ }; do
+        $PDSH $CLIENT "set -x; PATH=:$PATH:$LUSTRE/utils:$LUSTRE/tests/:${DBENCH_LIB} DBENCH_LIB=${DBENCH_LIB} $cmd" &
+        PID=$!
+        echo $PID >pid.$CLIENT
+        echo "Started load PID=`cat pid.$CLIENT`"
+    done
+
+    replay_barrier $SINGLEMDS
+    sleep 3 # give clients a time to do operations
+
+    vbr_deactivate_client $CLIENT2
+
+    log "$TESTNAME fail $SINGLEMDS 1"
+    fail $SINGLEMDS
+
+# wait for client to reconnect to MDS
+    sleep $TIMEOUT
+
+    vbr_activate_client $CLIENT2
+    do_node $CLIENT2 df $DIR || return 4
+
+    for CLIENT in ${CLIENTS//,/ }; do
+        PID=`cat pid.$CLIENT`
+        wait $PID
+        rc=$?
+        echo "load on ${CLIENT} returned $rc"
+    done
+
+    zconf_umount_clients $CLIENTS $DIR
+}
+run_test 10 "mds version recovery; $CLIENTCOUNT clients"
+
+equals_msg `basename $0`: test complete, cleaning up
+#SLEEP=$((`date +%s` - $NOW))
+#[ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP
+check_and_cleanup_lustre
+[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true
index c357ec8..e637384 100644 (file)
@@ -4960,6 +4960,7 @@ test_120a() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120a "Early Lock Cancel: mkdir test"
 
@@ -4980,6 +4981,7 @@ test_120b() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120b "Early Lock Cancel: create test"
 
@@ -5002,6 +5004,7 @@ test_120c() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120c "Early Lock Cancel: link test"
 
@@ -5023,6 +5026,7 @@ test_120d() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120d "Early Lock Cancel: setattr test"
 
@@ -5050,6 +5054,7 @@ test_120e() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120e "Early Lock Cancel: unlink test"
 
@@ -5080,6 +5085,7 @@ test_120f() {
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120f "Early Lock Cancel: rename test"
 
@@ -5121,6 +5127,7 @@ test_120g() {
         # wait for commitment of removal
         lru_resize_enable mdc
         lru_resize_enable osc
+#        rm -rf $DIR/$tdir
 }
 run_test 120g "Early Lock Cancel: performance test"
 
index a3f2382..3c8dfaa 100644 (file)
@@ -171,6 +171,8 @@ check_ptlrpc_body(void)
         CHECK_MEMBER(ptlrpc_body, pb_service_time);
         CHECK_MEMBER(ptlrpc_body, pb_slv);
         CHECK_MEMBER(ptlrpc_body, pb_limit);
+        CHECK_CVALUE(PTLRPC_NUM_VERSIONS);
+        CHECK_MEMBER(ptlrpc_body, pb_pre_versions[PTLRPC_NUM_VERSIONS]);
 }
 
 static void check_obd_connect_data(void)
@@ -217,6 +219,7 @@ static void check_obd_connect_data(void)
         CHECK_CDEFINE(OBD_CONNECT_AT);
         CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
         CHECK_CDEFINE(OBD_CONNECT_LRU_RESIZE);
+        CHECK_CDEFINE(OBD_CONNECT_VBR);
         CHECK_CDEFINE(OBD_CONNECT_SKIP_ORPHAN);
 }
 
index 5224858..3862e39 100644 (file)
@@ -396,6 +396,11 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+        CLASSERT(PTLRPC_NUM_VERSIONS == 4);
+        LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n",
+                 (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
+        LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n",
+                 (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]));
 
         /* Checks for struct obd_connect_data */
         LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
@@ -478,6 +483,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+        CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL);
         CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
 
         /* Checks for struct obdo */