Whamcloud - gitweb
LU-2017 mdc: add layout swap between 2 objects
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 1 Feb 2013 18:33:09 +0000 (10:33 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 6 Feb 2013 21:11:16 +0000 (16:11 -0500)
This patch adds the client and MDT code to swap layouts
between 2 files:
- a lfs command
- a llapi call, based on a new ioctl
- the mdc/mdt codes for the ioctl (with layout locking)

Signed-off-by: JC Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I7e710a1ab3ca38e8b26582e49d08e9943aa445cd
Reviewed-on: http://review.whamcloud.com/4507
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
37 files changed:
lustre/contrib/wireshark/packet-lustre.c
lustre/doc/lfs.1
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/include/lustre/lustreapi.h
lustre/include/lustre_lib.h
lustre/include/lustre_req_layout.h
lustre/include/obd_support.h
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/vvp_io.c
lustre/llite/vvp_object.c
lustre/lov/lov_object.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_mds.c
lustre/obdclass/cl_io.c
lustre/obdclass/lu_object.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/tests/racer/file_swap.sh [new file with mode: 0755]
lustre/tests/sanity.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/req-layout.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index f961f69..1cbfe35 100644 (file)
@@ -187,6 +187,9 @@ typedef enum {
 } obd_cmd_t;
 #define OBD_FIRST_OPC OBD_PING
 
 } obd_cmd_t;
 #define OBD_FIRST_OPC OBD_PING
 
+/* must be coherent with same declaration
+ * in lustre/include/lustre/lustre_idl.h
+ */
 typedef enum {
   MDS_GETATTR      = 33,
   MDS_GETATTR_NAME = 34,
 typedef enum {
   MDS_GETATTR      = 33,
   MDS_GETATTR_NAME = 34,
index 8293655..3f43e38 100644 (file)
@@ -73,6 +73,9 @@ lfs \- Lustre utility to create a file with specific striping pattern, find the
              \fB[-b <block-grace>] [-i <inode-grace>]
              \fB<filesystem>\fR
 .br
              \fB[-b <block-grace>] [-i <inode-grace>]
              \fB<filesystem>\fR
 .br
+.br
+.B lfs swap_layouts <filename1> <filename2>
+.br
 .B lfs data_version [-n] \fB<filename>\fR
 .br
 .B lfs help
 .B lfs data_version [-n] \fB<filename>\fR
 .br
 .B lfs help
@@ -222,6 +225,13 @@ To set filesystem quotas for users or groups. Limits can be specified with -b, -
 .B setquota -t [-u|-g] [--block-grace <block-grace>] [--inode-grace <inode-grace>] <filesystem>
 To set filesystem quota grace times for users or groups. Grace time is specified in "XXwXXdXXhXXmXXs" format or as an integer seconds value, see EXAMPLES
 .TP
 .B setquota -t [-u|-g] [--block-grace <block-grace>] [--inode-grace <inode-grace>] <filesystem>
 To set filesystem quota grace times for users or groups. Grace time is specified in "XXwXXdXXhXXmXXs" format or as an integer seconds value, see EXAMPLES
 .TP
+.B swap_layouts <filename1> <filename2>
+Swap the data (layout and OST objects) of two regular files. The
+two files have to be in the same filesystem, owned by the same user,
+reside on the same MDT and writable by the user.
+
+Swapping the layout of two directories is not permitted.
+.TP
 .B data_version [-n] <filename>
 Display current version of file data. If -n is specified, data version is read
 without taking lock. As a consequence, data version could be outdated if there
 .B data_version [-n] <filename>
 Display current version of file data. If -n is specified, data version is read
 without taking lock. As a consequence, data version could be outdated if there
index b7604e3..52fae16 100644 (file)
@@ -501,12 +501,16 @@ struct lu_object {
 };
 
 enum lu_object_header_flags {
 };
 
 enum lu_object_header_flags {
-        /**
-         * Don't keep this object in cache. Object will be destroyed as soon
-         * as last reference to it is released. This flag cannot be cleared
-         * once set.
-         */
-        LU_OBJECT_HEARD_BANSHEE = 0
+       /**
+        * Don't keep this object in cache. Object will be destroyed as soon
+        * as last reference to it is released. This flag cannot be cleared
+        * once set.
+        */
+       LU_OBJECT_HEARD_BANSHEE = 0,
+       /**
+        * Mark this object has already been taken out of cache.
+        */
+       LU_OBJECT_UNHASHED = 1
 };
 
 enum lu_object_header_attr {
 };
 
 enum lu_object_header_attr {
@@ -723,6 +727,7 @@ static inline int lu_object_is_dying(const struct lu_object_header *h)
 
 void lu_object_put(const struct lu_env *env, struct lu_object *o);
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o);
 
 void lu_object_put(const struct lu_env *env, struct lu_object *o);
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o);
+void lu_object_unhash(const struct lu_env *env, struct lu_object *o);
 
 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr);
 
 
 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr);
 
index 0b2adb4..553d6cc 100644 (file)
@@ -1870,6 +1870,7 @@ typedef enum {
        MDS_HSM_REQUEST         = 58,
        MDS_HSM_CT_REGISTER     = 59,
        MDS_HSM_CT_UNREGISTER   = 60,
        MDS_HSM_REQUEST         = 58,
        MDS_HSM_CT_REGISTER     = 59,
        MDS_HSM_CT_UNREGISTER   = 60,
+       MDS_SWAP_LAYOUTS        = 61,
        MDS_LAST_OPC
 } mds_cmd_t;
 
        MDS_LAST_OPC
 } mds_cmd_t;
 
@@ -3444,5 +3445,14 @@ struct update_reply {
 void lustre_swab_update_buf(struct update_buf *ub);
 void lustre_swab_update_reply_buf(struct update_reply *ur);
 
 void lustre_swab_update_buf(struct update_buf *ub);
 void lustre_swab_update_reply_buf(struct update_reply *ur);
 
+/** layout swap request structure
+ * fid1 and fid2 are in mdt_body
+ */
+struct mdc_swap_layouts {
+       __u64           msl_flags;
+} __packed;
+
+void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl);
+
 #endif
 /** @} lustreidl */
 #endif
 /** @} lustreidl */
index 9f1766a..1051c9a 100644 (file)
@@ -197,7 +197,8 @@ typedef struct lu_fid lustre_fid;
 #define LL_IOC_HSM_PROGRESS            _IOW('f', 216, struct hsm_user_request)
 #define LL_IOC_HSM_REQUEST             _IOW('f', 217, struct hsm_user_request)
 #define LL_IOC_DATA_VERSION            _IOR('f', 218, struct ioc_data_version)
 #define LL_IOC_HSM_PROGRESS            _IOW('f', 216, struct hsm_user_request)
 #define LL_IOC_HSM_REQUEST             _IOW('f', 217, struct hsm_user_request)
 #define LL_IOC_DATA_VERSION            _IOR('f', 218, struct ioc_data_version)
-/*     219 is reserved for swap layouts */
+#define LL_IOC_LOV_SWAP_LAYOUTS                _IOW('f', 219, \
+                                               struct lustre_swap_layouts)
 #define LL_IOC_HSM_ACTION              _IOR('f', 220, \
                                                struct hsm_current_action)
 /* see <lustre_lib.h> for ioctl numbers 221-232 */
 #define LL_IOC_HSM_ACTION              _IOR('f', 220, \
                                                struct hsm_current_action)
 /* see <lustre_lib.h> for ioctl numbers 221-232 */
@@ -572,6 +573,12 @@ struct if_quotactl {
         struct obd_uuid         obd_uuid;
 };
 
         struct obd_uuid         obd_uuid;
 };
 
+struct lustre_swap_layouts {
+       __u64   sl_flags;
+       __u32   sl_fd;
+       __u32   sl_gid;
+};
+
 
 /********* Changelogs **********/
 /** Changelog record types */
 
 /********* Changelogs **********/
 /** Changelog record types */
index 949a34a..22cf4ce 100644 (file)
@@ -251,6 +251,9 @@ static inline int llapi_create_volatile(char *directory, int mode)
 }
 
 
 }
 
 
+extern int llapi_fswap_layouts(const int fd1, const int fd2);
+extern int llapi_swap_layouts(const char *path1, const char *path2);
+
 /* Changelog interface.  priv is private state, managed internally
    by these functions */
 #define CHANGELOG_FLAG_FOLLOW 0x01   /* Not yet implemented */
 /* Changelog interface.  priv is private state, managed internally
    by these functions */
 #define CHANGELOG_FLAG_FOLLOW 0x01   /* Not yet implemented */
index e753df1..7bded0f 100644 (file)
@@ -553,7 +553,7 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 
 #define OBD_IOC_GET_OBJ_VERSION        _IOR('f', 210, OBD_IOC_DATA_TYPE)
 
 
 #define OBD_IOC_GET_OBJ_VERSION        _IOR('f', 210, OBD_IOC_DATA_TYPE)
 
-/* <lustre/lustre_user.h> defines ioctl number 218 */
+/* <lustre/lustre_user.h> defines ioctl number 218-219 */
 #define OBD_IOC_GET_MNTOPT             _IOW('f', 220, mntopt_t)
 
 #define OBD_IOC_ECHO_MD                _IOR('f', 221, struct obd_ioctl_data)
 #define OBD_IOC_GET_MNTOPT             _IOW('f', 220, mntopt_t)
 
 #define OBD_IOC_ECHO_MD                _IOR('f', 221, struct obd_ioctl_data)
index 93bb4d1..ce694a5 100644 (file)
@@ -188,6 +188,7 @@ extern struct req_format RQF_MDS_QUOTACHECK;
 extern struct req_format RQF_MDS_QUOTACTL;
 extern struct req_format RQF_QC_CALLBACK;
 extern struct req_format RQF_QUOTA_DQACQ;
 extern struct req_format RQF_MDS_QUOTACTL;
 extern struct req_format RQF_QC_CALLBACK;
 extern struct req_format RQF_QUOTA_DQACQ;
+extern struct req_format RQF_MDS_SWAP_LAYOUTS;
 /* MDS hsm formats */
 extern struct req_format RQF_MDS_HSM_STATE_GET;
 extern struct req_format RQF_MDS_HSM_STATE_SET;
 /* MDS hsm formats */
 extern struct req_format RQF_MDS_HSM_STATE_GET;
 extern struct req_format RQF_MDS_HSM_STATE_SET;
@@ -282,6 +283,7 @@ extern struct req_msg_field RMF_OBD_QUOTACHECK;
 extern struct req_msg_field RMF_OBD_QUOTACTL;
 extern struct req_msg_field RMF_QUOTA_BODY;
 extern struct req_msg_field RMF_STRING;
 extern struct req_msg_field RMF_OBD_QUOTACTL;
 extern struct req_msg_field RMF_QUOTA_BODY;
 extern struct req_msg_field RMF_STRING;
+extern struct req_msg_field RMF_SWAP_LAYOUTS;
 extern struct req_msg_field RMF_MDS_HSM_PROGRESS;
 extern struct req_msg_field RMF_MDS_HSM_REQUEST;
 extern struct req_msg_field RMF_MDS_HSM_USER_ITEM;
 extern struct req_msg_field RMF_MDS_HSM_PROGRESS;
 extern struct req_msg_field RMF_MDS_HSM_REQUEST;
 extern struct req_msg_field RMF_MDS_HSM_USER_ITEM;
index dc86177..08065f3 100644 (file)
@@ -239,6 +239,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_HSM_REQUEST_NET           0x14c
 #define OBD_FAIL_MDS_HSM_CT_REGISTER_NET       0x14d
 #define OBD_FAIL_MDS_HSM_CT_UNREGISTER_NET     0x14e
 #define OBD_FAIL_MDS_HSM_REQUEST_NET           0x14c
 #define OBD_FAIL_MDS_HSM_CT_REGISTER_NET       0x14d
 #define OBD_FAIL_MDS_HSM_CT_UNREGISTER_NET     0x14e
+#define OBD_FAIL_MDS_SWAP_LAYOUTS_NET          0x14f
 #define OBD_FAIL_MDS_HSM_ACTION_NET            0x150
 
 /* layout lock */
 #define OBD_FAIL_MDS_HSM_ACTION_NET            0x150
 
 /* layout lock */
index 0a677f9..466d259 100644 (file)
@@ -1432,6 +1432,8 @@ out_rmdir:
                         putname(filename);
                RETURN(rc);
        }
                         putname(filename);
                RETURN(rc);
        }
+       case LL_IOC_LOV_SWAP_LAYOUTS:
+               RETURN(-EPERM);
         case LL_IOC_OBD_STATFS:
                 RETURN(ll_obd_statfs(inode, (void *)arg));
         case LL_IOC_LOV_GETSTRIPE:
         case LL_IOC_OBD_STATFS:
                 RETURN(ll_obd_statfs(inode, (void *)arg));
         case LL_IOC_LOV_GETSTRIPE:
index c2d75fc..f79ef9d 100644 (file)
@@ -861,6 +861,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
         ssize_t               result;
         ENTRY;
 
         ssize_t               result;
         ENTRY;
 
+restart:
         io = ccc_env_thread_io(env);
         ll_io_init(io, file, iot == CIT_WRITE);
 
         io = ccc_env_thread_io(env);
         ll_io_init(io, file, iot == CIT_WRITE);
 
@@ -919,6 +920,8 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
         GOTO(out, result);
 out:
         cl_io_fini(env, io);
         GOTO(out, result);
 out:
         cl_io_fini(env, io);
+       if (result == 0 && io->ci_need_restart) /* need to restart whole IO */
+               goto restart;
 
         if (iot == CIT_READ) {
                 if (result >= 0)
 
         if (iot == CIT_READ) {
                 if (result >= 0)
@@ -929,7 +932,7 @@ out:
                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
                                            LPROC_LL_WRITE_BYTES, result);
                        fd->fd_write_failed = false;
                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
                                            LPROC_LL_WRITE_BYTES, result);
                        fd->fd_write_failed = false;
-               } else {
+               } else if (result != -ERESTARTSYS) {
                        fd->fd_write_failed = true;
                }
        }
                        fd->fd_write_failed = true;
                }
        }
@@ -1843,13 +1846,74 @@ int ll_data_version(struct inode *inode, __u64 *data_version,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+static int ll_swap_layout(struct file *file, struct file *file2,
+                       struct lustre_swap_layouts *lsl)
 {
 {
-        struct inode *inode = file->f_dentry->d_inode;
-        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
-        int flags;
+       struct mdc_swap_layouts  msl = { .msl_flags = lsl->sl_flags };
+       struct md_op_data       *op_data;
+       struct inode            *inode = file->f_dentry->d_inode;
+       struct inode            *inode2 = file2->f_dentry->d_inode;
+       __u32 gid;
+       int rc;
 
 
-        ENTRY;
+       if (!S_ISREG(inode2->i_mode))
+               RETURN(-EINVAL);
+
+       if (inode_permission(inode, MAY_WRITE) ||
+           inode_permission(inode2, MAY_WRITE))
+               RETURN(-EPERM);
+
+       if (inode2->i_sb != inode->i_sb)
+               RETURN(-EXDEV);
+
+       rc = lu_fid_cmp(ll_inode2fid(inode), ll_inode2fid(inode2));
+       if (rc == 0) /* same file, done! */
+               RETURN(0);
+
+       if (rc < 0) { /* sequentialize it */
+               swap(inode, inode2);
+               swap(file, file2);
+       }
+
+       gid = lsl->sl_gid;
+       if (gid != 0) { /* application asks to flush dirty cache */
+               rc = ll_get_grouplock(inode, file, gid);
+               if (rc < 0)
+                       RETURN(rc);
+
+               rc = ll_get_grouplock(inode2, file2, gid);
+               if (rc < 0) {
+                       ll_put_grouplock(inode, file, gid);
+                       RETURN(rc);
+               }
+       }
+
+       /* struct md_op_data is used to send the swap args to the mdt
+        * only flags is missing, so we use struct mdc_swap_layouts
+        * through the md_op_data->op_data */
+       rc = -ENOMEM;
+       op_data = ll_prep_md_op_data(NULL, inode, inode2, NULL, 0, 0,
+                                       LUSTRE_OPC_ANY, &msl);
+       if (op_data != NULL) {
+               rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(inode),
+                                       sizeof(*op_data), op_data, NULL);
+               ll_finish_md_op_data(op_data);
+       }
+
+       if (gid != 0) {
+               ll_put_grouplock(inode2, file2, gid);
+               ll_put_grouplock(inode, file, gid);
+       }
+
+       RETURN(rc);
+}
+
+long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+       struct inode            *inode = file->f_dentry->d_inode;
+       struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
+       int                      flags, rc;
+       ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
                inode->i_generation, inode, cmd);
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
                inode->i_generation, inode, cmd);
@@ -1889,6 +1953,27 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
                 RETURN(ll_lov_setstripe(inode, file, arg));
         case LL_IOC_LOV_SETEA:
                 RETURN(ll_lov_setea(inode, file, arg));
                 RETURN(ll_lov_setstripe(inode, file, arg));
         case LL_IOC_LOV_SETEA:
                 RETURN(ll_lov_setea(inode, file, arg));
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
+               struct file *file2;
+               struct lustre_swap_layouts lsl;
+
+               if (cfs_copy_from_user(&lsl, (char *)arg,
+                                      sizeof(struct lustre_swap_layouts)))
+                       RETURN(-EFAULT);
+
+               if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
+                       RETURN(-EPERM);
+
+               file2 = cfs_get_fd(lsl.sl_fd);
+               if (file2 == NULL)
+                       RETURN(-EBADF);
+
+               rc = -EPERM;
+               if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
+                       rc = ll_swap_layout(file, file2, &lsl);
+               cfs_put_file(file2);
+               RETURN(rc);
+       }
         case LL_IOC_LOV_GETSTRIPE:
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
         case LL_IOC_LOV_GETSTRIPE:
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
@@ -3086,6 +3171,7 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
                rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
                                  lock->l_lvb_data, lock->l_lvb_len);
                if (rc >= 0) {
                rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
                                  lock->l_lvb_data, lock->l_lvb_len);
                if (rc >= 0) {
+                       *gen = LL_LAYOUT_GEN_EMPTY;
                        if (md.lsm != NULL)
                                *gen = md.lsm->lsm_layout_gen;
                        rc = 0;
                        if (md.lsm != NULL)
                                *gen = md.lsm->lsm_layout_gen;
                        rc = 0;
@@ -3165,7 +3251,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
        int rc;
        ENTRY;
 
-       *gen = LL_LAYOUT_GEN_ZERO;
+       *gen = LL_LAYOUT_GEN_NONE;
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
index a38ca9e..9329d6c 100644 (file)
@@ -402,6 +402,26 @@ enum stats_track_type {
 #define LL_SBI_LAYOUT_LOCK    0x20000 /* layout lock support */
 #define LL_SBI_USER_FID2PATH  0x40000 /* allow fid2path by unprivileged users */
 
 #define LL_SBI_LAYOUT_LOCK    0x20000 /* layout lock support */
 #define LL_SBI_USER_FID2PATH  0x40000 /* allow fid2path by unprivileged users */
 
+#define LL_SBI_FLAGS {         \
+       "nolck",        \
+       "checksum",     \
+       "flock",        \
+       "xattr",        \
+       "acl",          \
+       "rmt_client",   \
+       "mds_capa",     \
+       "oss_capa",     \
+       "flock",        \
+       "lru_resize",   \
+       "lazy_statfs",  \
+       "som",          \
+       "32bit_api",    \
+       "64bit_hash",   \
+       "agl",          \
+       "verbose",      \
+       "layout",       \
+       "user_fid2path" }
+
 /* default value for ll_sb_info->contention_time */
 #define SBI_DEFAULT_CONTENTION_SECONDS     60
 /* default value for lockless_truncate_enable */
 /* default value for ll_sb_info->contention_time */
 #define SBI_DEFAULT_CONTENTION_SECONDS     60
 /* default value for lockless_truncate_enable */
@@ -1605,7 +1625,11 @@ struct if_quotactl_18 {
 #warning "remove old LL_IOC_QUOTACTL_18 compatibility code"
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) */
 
 #warning "remove old LL_IOC_QUOTACTL_18 compatibility code"
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) */
 
-#define LL_LAYOUT_GEN_ZERO     ((__u32)-1)
+enum {
+       LL_LAYOUT_GEN_NONE  = ((__u32)-2),      /* layout lock was cancelled */
+       LL_LAYOUT_GEN_EMPTY = ((__u32)-1)       /* for empty layout */
+};
+
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 
index af98592..0e343f8 100644 (file)
@@ -916,7 +916,7 @@ void ll_lli_init(struct ll_inode_info *lli)
        mutex_init(&lli->lli_och_mutex);
        spin_lock_init(&lli->lli_agl_lock);
        lli->lli_has_smd = false;
        mutex_init(&lli->lli_och_mutex);
        spin_lock_init(&lli->lli_agl_lock);
        lli->lli_has_smd = false;
-       lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
+       lli->lli_layout_gen = LL_LAYOUT_GEN_NONE;
        lli->lli_clob = NULL;
 
        LASSERT(lli->lli_vfs_inode.i_mode != 0);
        lli->lli_clob = NULL;
 
        LASSERT(lli->lli_vfs_inode.i_mode != 0);
index dedc17f..a8dba54 100644 (file)
@@ -714,6 +714,32 @@ static int ll_rd_maxea_size(char *page, char **start, off_t off,
         return snprintf(page, count, "%u\n", ealen);
 }
 
         return snprintf(page, count, "%u\n", ealen);
 }
 
+static int ll_rd_sbi_flags(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
+{
+       const char *str[] = LL_SBI_FLAGS;
+       struct super_block *sb = data;
+       int flags = ll_s2sbi(sb)->ll_flags;
+       int i = 0;
+       int rc = 0;
+
+       while (flags != 0) {
+               if (ARRAY_SIZE(str) <= i) {
+                       CERROR("%s: Revise array LL_SBI_FLAGS to match sbi "
+                               "flags please.\n", ll_get_fsname(sb, NULL, 0));
+                       return -EINVAL;
+               }
+
+               if (flags & 0x1)
+                       rc += snprintf(page + rc, count - rc, "%s ", str[i]);
+               flags >>= 1;
+               ++i;
+       }
+       if (rc > 0)
+               rc += snprintf(page + rc, count - rc, "\b\n");
+       return rc;
+}
+
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -744,6 +770,7 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "statahead_stats",  ll_rd_statahead_stats, 0, 0 },
         { "lazystatfs",       ll_rd_lazystatfs, ll_wr_lazystatfs, 0 },
         { "max_easize",       ll_rd_maxea_size, 0, 0 },
         { "statahead_stats",  ll_rd_statahead_stats, 0, 0 },
         { "lazystatfs",       ll_rd_lazystatfs, ll_wr_lazystatfs, 0 },
         { "max_easize",       ll_rd_maxea_size, 0, 0 },
+       { "sbi_flags",        ll_rd_sbi_flags, 0, 0 },
         { 0 }
 };
 
         { 0 }
 };
 
index 18e30b6..9020d77 100644 (file)
@@ -65,6 +65,39 @@ int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
         return vio->cui_io_subtype == IO_NORMAL;
 }
 
         return vio->cui_io_subtype == IO_NORMAL;
 }
 
+/**
+ * For swapping layout. The file's layout may have changed.
+ * To avoid populating pages to a wrong stripe, we have to verify the
+ * correctness of layout. It works because swapping layout processes
+ * have to acquire group lock.
+ */
+static bool can_populate_pages(const struct lu_env *env, struct cl_io *io,
+                               struct inode *inode)
+{
+       struct ll_inode_info    *lli = ll_i2info(inode);
+       struct ccc_io           *cio = ccc_env_io(env);
+       bool rc = true;
+
+       switch (io->ci_type) {
+       case CIT_READ:
+       case CIT_WRITE:
+               /* don't need lock here to check lli_layout_gen as we have held
+                * extent lock and GROUP lock has to hold to swap layout */
+               if (lli->lli_layout_gen != cio->cui_layout_gen) {
+                       io->ci_need_restart = 1;
+                       /* this will return application a short read/write */
+                       io->ci_continue = 0;
+                       rc = false;
+               }
+       case CIT_FAULT:
+               /* fault is okay because we've already had a page. */
+       default:
+               break;
+       }
+
+       return rc;
+}
+
 /*****************************************************************************
  *
  * io operations.
 /*****************************************************************************
  *
  * io operations.
@@ -452,6 +485,9 @@ static int vvp_io_read_start(const struct lu_env *env,
 
         CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
 
 
         CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
 
+       if (!can_populate_pages(env, io, inode))
+               return 0;
+
         result = ccc_prep_size(env, obj, io, pos, tot, &exceed);
         if (result != 0)
                 return result;
         result = ccc_prep_size(env, obj, io, pos, tot, &exceed);
         if (result != 0)
                 return result;
@@ -542,6 +578,9 @@ static int vvp_io_write_start(const struct lu_env *env,
 
         ENTRY;
 
 
         ENTRY;
 
+       if (!can_populate_pages(env, io, inode))
+               return 0;
+
         if (cl_io_is_append(io)) {
                 /*
                  * PARALLEL IO This has to be changed for parallel IO doing
         if (cl_io_is_append(io)) {
                 /*
                  * PARALLEL IO This has to be changed for parallel IO doing
index 20dcdf9..5b78deb 100644 (file)
@@ -128,6 +128,11 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
 {
        struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
 
 {
        struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
 
+       if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
+               lli->lli_layout_gen = LL_LAYOUT_GEN_NONE;
+               return 0;
+       }
+
        if (conf->coc_opc != OBJECT_CONF_SET)
                return 0;
 
        if (conf->coc_opc != OBJECT_CONF_SET)
                return 0;
 
@@ -143,7 +148,7 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
                        lli->lli_layout_gen);
 
                lli->lli_has_smd = false;
                        lli->lli_layout_gen);
 
                lli->lli_has_smd = false;
-               lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
+               lli->lli_layout_gen = LL_LAYOUT_GEN_EMPTY;
        }
        return 0;
 }
        }
        return 0;
 }
index f4f2861..060e629 100644 (file)
@@ -162,13 +162,28 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                 r0->lo_sub[idx]->lso_index = idx;
                 result = 0;
         } else {
                 r0->lo_sub[idx]->lso_index = idx;
                 result = 0;
         } else {
-                CERROR("Stripe is already owned by other file (%d).\n", idx);
-                LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
-                LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
-                                "old\n");
-                LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
-                cl_object_put(env, stripe);
-                result = -EIO;
+               struct lu_object  *old_obj;
+               struct lov_object *old_lov;
+               unsigned int mask = D_INODE;
+
+               old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
+               LASSERT(old_obj != NULL);
+               old_lov = cl2lov(lu2cl(old_obj));
+               if (old_lov->lo_layout_invalid) {
+                       /* the object's layout has already changed but isn't
+                        * refreshed */
+                       lu_object_unhash(env, &stripe->co_lu);
+                       result = -EAGAIN;
+               } else {
+                       mask = D_ERROR;
+                       result = -EIO;
+               }
+
+               LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
+                               "stripe %d is already owned.\n", idx);
+               LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
+               LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
+               cl_object_put(env, stripe);
         }
         return result;
 }
         }
         return result;
 }
@@ -223,10 +238,15 @@ static int lov_init_raid0(const struct lu_env *env,
                         * lu_obj_hop_keycmp() */
                        /* coverity[overrun-buffer-val] */
                         stripe = lov_sub_find(env, subdev, ofid, subconf);
                         * lu_obj_hop_keycmp() */
                        /* coverity[overrun-buffer-val] */
                         stripe = lov_sub_find(env, subdev, ofid, subconf);
-                        if (!IS_ERR(stripe))
+                        if (!IS_ERR(stripe)) {
                                 result = lov_init_sub(env, lov, stripe, r0, i);
                                 result = lov_init_sub(env, lov, stripe, r0, i);
-                        else
+                               if (result == -EAGAIN) { /* try again */
+                                       --i;
+                                       result = 0;
+                               }
+                        } else {
                                 result = PTR_ERR(stripe);
                                 result = PTR_ERR(stripe);
+                       }
                 }
         } else
                 result = -ENOMEM;
                 }
         } else
                 result = -ENOMEM;
@@ -348,7 +368,7 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
 static int lov_print_empty(const struct lu_env *env, void *cookie,
                            lu_printer_t p, const struct lu_object *o)
 {
 static int lov_print_empty(const struct lu_env *env, void *cookie,
                            lu_printer_t p, const struct lu_object *o)
 {
-        (*p)(env, cookie, "empty\n");
+        (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
         return 0;
 }
 
         return 0;
 }
 
@@ -357,9 +377,13 @@ static int lov_print_raid0(const struct lu_env *env, void *cookie,
 {
         struct lov_object       *lov = lu2lov(o);
         struct lov_layout_raid0 *r0  = lov_r0(lov);
 {
         struct lov_object       *lov = lu2lov(o);
         struct lov_layout_raid0 *r0  = lov_r0(lov);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
         int i;
 
         int i;
 
-        (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
+        (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n",
+               r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm,
+               lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+               lsm->lsm_stripe_count, lsm->lsm_layout_gen);
         for (i = 0; i < r0->lo_nr; ++i) {
                 struct lu_object *sub;
 
         for (i = 0; i < r0->lo_nr; ++i) {
                 struct lu_object *sub;
 
@@ -573,10 +597,11 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 }
 
 static int lov_layout_change(const struct lu_env *unused,
 }
 
 static int lov_layout_change(const struct lu_env *unused,
-                             struct lov_object *lov, enum lov_layout_type llt,
+                             struct lov_object *lov,
                              const struct cl_object_conf *conf)
 {
        int result;
                              const struct cl_object_conf *conf)
 {
        int result;
+       enum lov_layout_type llt = LLT_EMPTY;
        union lov_layout_state *state = &lov->u;
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
        union lov_layout_state *state = &lov->u;
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
@@ -585,10 +610,13 @@ static int lov_layout_change(const struct lu_env *unused,
        void *cookie;
        struct lu_env *env;
        int refcheck;
        void *cookie;
        struct lu_env *env;
        int refcheck;
+       ENTRY;
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
+
+       if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
+               llt = LLT_RAID0; /* only raid0 is supported. */
        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
-       ENTRY;
 
        cookie = cl_env_reenter();
        env = cl_env_get(&refcheck);
 
        cookie = cl_env_reenter();
        env = cl_env_get(&refcheck);
@@ -605,7 +633,6 @@ static int lov_layout_change(const struct lu_env *unused,
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
-               LASSERT(cfs_list_empty(&hdr->coh_locks));
                LASSERT(hdr->coh_tree.rnode == NULL);
                LASSERT(hdr->coh_pages == 0);
 
                LASSERT(hdr->coh_tree.rnode == NULL);
                LASSERT(hdr->coh_pages == 0);
 
@@ -697,24 +724,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                GOTO(out, result = -EBUSY);
        }
 
                GOTO(out, result = -EBUSY);
        }
 
-       /*
-        * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
-        */
-       switch (lov->lo_type) {
-       case LLT_EMPTY:
-               if (lsm != NULL)
-                       result = lov_layout_change(env, lov, LLT_RAID0, conf);
-               break;
-       case LLT_RAID0:
-               if (lsm == NULL)
-                       result = lov_layout_change(env, lov, LLT_EMPTY, conf);
-               else if (lov_stripe_md_cmp(lov->lo_lsm, lsm))
-                       result = -EOPNOTSUPP;
-               break;
-       default:
-               LBUG();
-       }
-       lov->lo_layout_invalid = result != 0;
+       lov->lo_layout_invalid = lov_layout_change(env, lov, conf);
        EXIT;
 
 out:
        EXIT;
 
 out:
@@ -745,7 +755,7 @@ static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
 static int lov_object_print(const struct lu_env *env, void *cookie,
                             lu_printer_t p, const struct lu_object *o)
 {
 static int lov_object_print(const struct lu_env *env, void *cookie,
                             lu_printer_t p, const struct lu_object *o)
 {
-        return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
+        return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
 }
 
 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
 }
 
 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
index f32b4a8..13f0501 100644 (file)
@@ -52,11 +52,13 @@ static inline void lprocfs_mdc_init_vars(struct lprocfs_static_vars *lvars)
 void mdc_pack_body(struct ptlrpc_request *req, const struct lu_fid *fid,
                    struct obd_capa *oc, __u64 valid, int ea_size,
                    __u32 suppgid, int flags);
 void mdc_pack_body(struct ptlrpc_request *req, const struct lu_fid *fid,
                    struct obd_capa *oc, __u64 valid, int ea_size,
                    __u32 suppgid, int flags);
-void mdc_pack_capa(struct ptlrpc_request *req, const struct req_msg_field *field,
-                   struct obd_capa *oc);
+void mdc_pack_capa(struct ptlrpc_request *req,
+                  const struct req_msg_field *field, struct obd_capa *oc);
 int mdc_pack_req(struct ptlrpc_request *req, int version, int opc);
 void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
                         const struct lu_fid *cfid, int flags);
 int mdc_pack_req(struct ptlrpc_request *req, int version, int opc);
 void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
                         const struct lu_fid *cfid, int flags);
+void mdc_swap_layouts_pack(struct ptlrpc_request *req,
+                          struct md_op_data *op_data);
 void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff, __u32 size,
                       const struct lu_fid *fid, struct obd_capa *oc);
 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
 void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff, __u32 size,
                       const struct lu_fid *fid, struct obd_capa *oc);
 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
@@ -168,4 +170,12 @@ ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
                            struct lustre_handle *lockh);
 
                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
                            struct lustre_handle *lockh);
 
+static inline int mdc_prep_elc_req(struct obd_export *exp,
+                                  struct ptlrpc_request *req, int opc,
+                                  cfs_list_t *cancels, int count)
+{
+       return ldlm_prep_elc_req(exp, req, LUSTRE_MDS_VERSION, opc, 0, cancels,
+                                count);
+}
+
 #endif
 #endif
index 6486db3..9dcbfa0 100644 (file)
@@ -94,6 +94,21 @@ void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
         b->flags = flags;
 }
 
         b->flags = flags;
 }
 
+void mdc_swap_layouts_pack(struct ptlrpc_request *req,
+                          struct md_op_data *op_data)
+{
+       struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
+                                                   &RMF_MDT_BODY);
+
+       __mdc_pack_body(b, op_data->op_suppgids[0]);
+       b->fid1 = op_data->op_fid1;
+       b->fid2 = op_data->op_fid2;
+       b->valid |= OBD_MD_FLID;
+
+       mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+       mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
+}
+
 void mdc_pack_body(struct ptlrpc_request *req,
                    const struct lu_fid *fid, struct obd_capa *oc,
                    __u64 valid, int ea_size, __u32 suppgid, int flags)
 void mdc_pack_body(struct ptlrpc_request *req,
                    const struct lu_fid *fid, struct obd_capa *oc,
                    __u64 valid, int ea_size, __u32 suppgid, int flags)
index d359bd4..ae28bf3 100644 (file)
@@ -105,13 +105,6 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
         RETURN(count);
 }
 
         RETURN(count);
 }
 
-static int mdc_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
-                            cfs_list_t *cancels, int count)
-{
-        return ldlm_prep_elc_req(exp, req, LUSTRE_MDS_VERSION, MDS_REINT,
-                                 0, cancels, count);
-}
-
 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 void *ea, int ealen, void *ea2, int ea2len,
                 struct ptlrpc_request **request, struct md_open_data **mod)
 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 void *ea, int ealen, void *ea2, int ea2len,
                 struct ptlrpc_request **request, struct md_open_data **mod)
@@ -148,11 +141,11 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
                              ea2len);
 
         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
                              ea2len);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         rpc_lock = obd->u.cli.cl_rpc_lock;
 
 
         rpc_lock = obd->u.cli.cl_rpc_lock;
 
@@ -263,11 +256,11 @@ rebuild:
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
                              data && datalen ? datalen : 0);
 
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
                              data && datalen ? datalen : 0);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
@@ -362,11 +355,11 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         mdc_unlink_pack(req, op_data);
 
 
         mdc_unlink_pack(req, op_data);
 
@@ -414,11 +407,11 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         mdc_link_pack(req, op_data);
         ptlrpc_request_set_replen(req);
 
         mdc_link_pack(req, op_data);
         ptlrpc_request_set_replen(req);
@@ -474,11 +467,11 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
 
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         if (exp_connect_cancelset(exp) && req)
                 ldlm_cli_cancel_list(&cancels, count, req, 0);
 
         if (exp_connect_cancelset(exp) && req)
                 ldlm_cli_cancel_list(&cancels, count, req, 0);
index dc366af..df9376b 100644 (file)
@@ -1702,6 +1702,63 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+static int mdc_ioc_swap_layouts(struct obd_export *exp,
+                               struct md_op_data *op_data)
+{
+       CFS_LIST_HEAD(cancels);
+       struct ptlrpc_request   *req;
+       int                      rc, count;
+       struct mdc_swap_layouts *msl, *payload;
+       ENTRY;
+
+       msl = op_data->op_data;
+
+       /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
+        * first thing it will do is to cancel the 2 layout
+        * locks hold by this client.
+        * So the client must cancel its layout locks on the 2 fids
+        * with the request RPC to avoid extra RPC round trips
+        */
+       count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
+                                       LCK_CR, MDS_INODELOCK_LAYOUT);
+       count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
+                                        LCK_CR, MDS_INODELOCK_LAYOUT);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_SWAP_LAYOUTS);
+       if (req == NULL) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               RETURN(-ENOMEM);
+       }
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+       mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+
+       rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_swap_layouts_pack(req, op_data);
+
+       payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
+       LASSERT(payload);
+
+       *payload = *msl;
+
+       ptlrpc_request_set_replen(req);
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+       EXIT;
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
@@ -1837,6 +1894,10 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                else
                        GOTO(out, rc = 0);
        }
                else
                        GOTO(out, rc = 0);
        }
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
+               rc = mdc_ioc_swap_layouts(exp, karg);
+               break;
+       }
         default:
                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 GOTO(out, rc = -ENOTTY);
         default:
                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 GOTO(out, rc = -ENOTTY);
index 59b6d34..3d1febd 100644 (file)
@@ -427,7 +427,7 @@ void mdt_client_compatibility(struct mdt_thread_info *info)
         struct lu_attr        *la = &ma->ma_attr;
         ENTRY;
 
         struct lu_attr        *la = &ma->ma_attr;
         ENTRY;
 
-       if (exp_connect_flags(exp) & OBD_CONNECT_LAYOUTLOCK)
+       if (exp_connect_layout(exp))
                /* the client can deal with 16-bit lmm_stripe_count */
                RETURN_EXIT;
 
                /* the client can deal with 16-bit lmm_stripe_count */
                RETURN_EXIT;
 
@@ -1006,6 +1006,95 @@ int mdt_is_subdir(struct mdt_thread_info *info)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+int mdt_swap_layouts(struct mdt_thread_info *info)
+{
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct obd_export       *exp = req->rq_export;
+       struct mdt_object       *o1, *o2, *o;
+       struct mdt_lock_handle  *lh1, *lh2;
+       struct mdc_swap_layouts *msl;
+       int                      rc;
+       ENTRY;
+
+       /* client does not support layout lock, so layout swaping
+        * is disabled.
+        * FIXME: there is a problem for old clients which don't support
+        * layout lock yet. If those clients have already opened the file
+        * they won't be notified at all so that old layout may still be
+        * used to do IO. This can be fixed after file release is landed by
+        * doing exclusive open and taking full EX ibits lock. - Jinshan */
+       if (!exp_connect_layout(exp))
+               RETURN(-EOPNOTSUPP);
+
+       if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
+               mdt_set_capainfo(info, 0, &info->mti_body->fid1,
+                                req_capsule_client_get(info->mti_pill,
+                                                       &RMF_CAPA1));
+
+       if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
+               mdt_set_capainfo(info, 1, &info->mti_body->fid2,
+                                req_capsule_client_get(info->mti_pill,
+                                                       &RMF_CAPA2));
+
+       o1 = info->mti_object;
+       o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+                               &info->mti_body->fid2);
+       if (IS_ERR(o))
+               GOTO(out, rc = PTR_ERR(o));
+
+       if (mdt_object_exists(o) < 0) /* remote object */
+               GOTO(put, rc = -ENOENT);
+
+       rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
+       if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
+               GOTO(put, rc);
+
+       if (rc < 0)
+               swap(o1, o2);
+
+       /* permission check. Make sure the calling process having permission
+        * to write both files. */
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+                               MAY_WRITE);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+                               MAY_WRITE);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
+       LASSERT(msl != NULL);
+
+       lh1 = &info->mti_lh[MDT_LH_NEW];
+       mdt_lock_reg_init(lh1, LCK_EX);
+       lh2 = &info->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lh2, LCK_EX);
+
+       rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT,
+                            MDT_LOCAL_LOCK);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT,
+                            MDT_LOCAL_LOCK);
+       if (rc < 0)
+               GOTO(unlock1, rc);
+
+       rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+                            mdt_object_child(o2), msl->msl_flags);
+       GOTO(unlock2, rc);
+unlock2:
+       mdt_object_unlock(info, o2, lh2, rc);
+unlock1:
+       mdt_object_unlock(info, o1, lh1, rc);
+put:
+       mdt_object_put(info->mti_env, o);
+out:
+       RETURN(rc);
+}
+
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                           struct mdt_object *parent,
                           const struct lu_name *lname,
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                           struct mdt_object *parent,
                           const struct lu_name *lname,
@@ -3167,6 +3256,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg)
         case MDS_QUOTACHECK:
         case MDS_QUOTACTL:
        case UPDATE_OBJ:
         case MDS_QUOTACHECK:
         case MDS_QUOTACTL:
        case UPDATE_OBJ:
+       case MDS_SWAP_LAYOUTS:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
         case SEQ_QUERY:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
         case SEQ_QUERY:
index 57ada59..f2ccb95 100644 (file)
@@ -798,6 +798,7 @@ extern struct mdt_opc_slice mdt_fld_handlers[];
 int mdt_quotacheck(struct mdt_thread_info *info);
 int mdt_quotactl(struct mdt_thread_info *info);
 int mdt_quota_dqacq(struct mdt_thread_info *info);
 int mdt_quotacheck(struct mdt_thread_info *info);
 int mdt_quotactl(struct mdt_thread_info *info);
 int mdt_quota_dqacq(struct mdt_thread_info *info);
+int mdt_swap_layouts(struct mdt_thread_info *info);
 
 extern struct lprocfs_vars lprocfs_mds_module_vars[];
 extern struct lprocfs_vars lprocfs_mds_obd_vars[];
 
 extern struct lprocfs_vars lprocfs_mds_module_vars[];
 extern struct lprocfs_vars lprocfs_mds_obd_vars[];
index fb37a2c..4e5e7c5 100644 (file)
@@ -152,6 +152,7 @@ DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
                                                mdt_hsm_state_set),
 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
 DEF_MDT_HDL(0          | HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
                                                mdt_hsm_state_set),
 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
 DEF_MDT_HDL(0          | HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
+DEF_MDT_HDL(HABEO_CORPUS|HABEO_REFERO, MDS_SWAP_LAYOUTS, mdt_swap_layouts)
 };
 
 #define DEF_OBD_HDL(flags, name, fn)                                   \
 };
 
 #define DEF_OBD_HDL(flags, name, fn)                                   \
index ecfe76e..aab7682 100644 (file)
@@ -130,6 +130,7 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io)
        switch(io->ci_type) {
        case CIT_READ:
        case CIT_WRITE:
        switch(io->ci_type) {
        case CIT_READ:
        case CIT_WRITE:
+               break;
        case CIT_FAULT:
        case CIT_FSYNC:
                LASSERT(!io->ci_need_restart);
        case CIT_FAULT:
        case CIT_FSYNC:
                LASSERT(!io->ci_need_restart);
index 6392139..fc2596f 100644 (file)
@@ -143,7 +143,8 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
          * and LRU lock, no race with concurrent object lookup is possible
          * and we can safely destroy object below.
          */
          * and LRU lock, no race with concurrent object lookup is possible
          * and we can safely destroy object below.
          */
-        cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
+               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
         /*
          * Object was already removed from hash and lru above, can
         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
         /*
          * Object was already removed from hash and lru above, can
@@ -159,13 +160,34 @@ EXPORT_SYMBOL(lu_object_put);
  */
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
 {
  */
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
 {
-       set_bit(LU_OBJECT_HEARD_BANSHEE,
-                   &o->lo_header->loh_flags);
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
        return lu_object_put(env, o);
 }
 EXPORT_SYMBOL(lu_object_put_nocache);
 
 /**
        return lu_object_put(env, o);
 }
 EXPORT_SYMBOL(lu_object_put_nocache);
 
 /**
+ * Kill the object and take it out of LRU cache.
+ * Currently used by client code for layout change.
+ */
+void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
+{
+       struct lu_object_header *top;
+
+       top = o->lo_header;
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
+               cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
+               cfs_hash_bd_t bd;
+
+               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               cfs_list_del_init(&top->loh_lru);
+               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
+               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+       }
+}
+EXPORT_SYMBOL(lu_object_unhash);
+
+/**
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
index fe04c8c..84642b6 100644 (file)
@@ -294,6 +294,15 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = {
         &RMF_EADATA
 };
 
         &RMF_EADATA
 };
 
+static const struct req_msg_field *mdt_swap_layouts[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_MDT_BODY,
+       &RMF_SWAP_LAYOUTS,
+       &RMF_CAPA1,
+       &RMF_CAPA2,
+       &RMF_DLM_REQ
+};
+
 static const struct req_msg_field *obd_connect_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_TGTUUID,
 static const struct req_msg_field *obd_connect_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_TGTUUID,
@@ -678,9 +687,8 @@ static struct req_format *req_formats[] = {
        &RQF_MDS_HSM_STATE_SET,
        &RQF_MDS_HSM_ACTION,
        &RQF_MDS_HSM_REQUEST,
        &RQF_MDS_HSM_STATE_SET,
        &RQF_MDS_HSM_ACTION,
        &RQF_MDS_HSM_REQUEST,
-
+       &RQF_MDS_SWAP_LAYOUTS,
        &RQF_UPDATE_OBJ,
        &RQF_UPDATE_OBJ,
-
        &RQF_QC_CALLBACK,
         &RQF_OST_CONNECT,
         &RQF_OST_DISCONNECT,
        &RQF_QC_CALLBACK,
         &RQF_OST_CONNECT,
         &RQF_OST_DISCONNECT,
@@ -1104,6 +1112,10 @@ struct req_msg_field RMF_UPDATE_REPLY = DEFINE_MSGF("update_reply", 0, -1,
                                                    NULL);
 EXPORT_SYMBOL(RMF_UPDATE_REPLY);
 
                                                    NULL);
 EXPORT_SYMBOL(RMF_UPDATE_REPLY);
 
+struct req_msg_field RMF_SWAP_LAYOUTS =
+       DEFINE_MSGF("swap_layouts", 0, sizeof(struct  mdc_swap_layouts),
+                   lustre_swab_swap_layouts, NULL);
+EXPORT_SYMBOL(RMF_SWAP_LAYOUTS);
 /*
  * Request formats.
  */
 /*
  * Request formats.
  */
@@ -1437,6 +1449,11 @@ struct req_format RQF_MDS_HSM_REQUEST =
        DEFINE_REQ_FMT0("MDS_HSM_REQUEST", mdt_hsm_request, empty);
 EXPORT_SYMBOL(RQF_MDS_HSM_REQUEST);
 
        DEFINE_REQ_FMT0("MDS_HSM_REQUEST", mdt_hsm_request, empty);
 EXPORT_SYMBOL(RQF_MDS_HSM_REQUEST);
 
+struct req_format RQF_MDS_SWAP_LAYOUTS =
+       DEFINE_REQ_FMT0("MDS_SWAP_LAYOUTS",
+                       mdt_swap_layouts, empty);
+EXPORT_SYMBOL(RQF_MDS_SWAP_LAYOUTS);
+
 /* This is for split */
 struct req_format RQF_MDS_WRITEPAGE =
         DEFINE_REQ_FMT0("MDS_WRITEPAGE",
 /* This is for split */
 struct req_format RQF_MDS_WRITEPAGE =
         DEFINE_REQ_FMT0("MDS_WRITEPAGE",
index 814e939..03ed9b9 100644 (file)
@@ -101,6 +101,7 @@ struct ll_rpc_opcode {
        { MDS_HSM_REQUEST,  "mds_hsm_request" },
        { MDS_HSM_CT_REGISTER, "mds_hsm_ct_register" },
        { MDS_HSM_CT_UNREGISTER, "mds_hsm_ct_unregister" },
        { MDS_HSM_REQUEST,  "mds_hsm_request" },
        { MDS_HSM_CT_REGISTER, "mds_hsm_ct_register" },
        { MDS_HSM_CT_UNREGISTER, "mds_hsm_ct_unregister" },
+       { MDS_SWAP_LAYOUTS,     "mds_swap_layouts" },
         { LDLM_ENQUEUE,     "ldlm_enqueue" },
         { LDLM_CONVERT,     "ldlm_convert" },
         { LDLM_CANCEL,      "ldlm_cancel" },
         { LDLM_ENQUEUE,     "ldlm_enqueue" },
         { LDLM_CONVERT,     "ldlm_convert" },
         { LDLM_CANCEL,      "ldlm_cancel" },
index 19786ca..7c8c6ba 100644 (file)
@@ -2582,3 +2582,8 @@ void lustre_swab_update_reply_buf(struct update_reply *ur)
 }
 EXPORT_SYMBOL(lustre_swab_update_reply_buf);
 
 }
 EXPORT_SYMBOL(lustre_swab_update_reply_buf);
 
+void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl)
+{
+       __swab64s(&msl->msl_flags);
+}
+EXPORT_SYMBOL(lustre_swab_swap_layouts);
index 433a206..35cb38e 100644 (file)
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre_disk.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre_disk.h>
-
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux testnode 2.6.32-279.5.1.el6_lustre.g53f705f.x86_64 #1 SMP Mon Oct 8 05:12
+         * running on Linux mercury 2.6.32-279.5.1.el6_lustre.x86_64 #1 SMP Tue Aug 21 00:00:41 PDT 2
          * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
          * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
@@ -179,7 +178,9 @@ void lustre_assert_wire_constants(void)
                 (long long)MDS_HSM_CT_REGISTER);
        LASSERTF(MDS_HSM_CT_UNREGISTER == 60, "found %lld\n",
                 (long long)MDS_HSM_CT_UNREGISTER);
                 (long long)MDS_HSM_CT_REGISTER);
        LASSERTF(MDS_HSM_CT_UNREGISTER == 60, "found %lld\n",
                 (long long)MDS_HSM_CT_UNREGISTER);
-       LASSERTF(MDS_LAST_OPC == 61, "found %lld\n",
+       LASSERTF(MDS_SWAP_LAYOUTS == 61, "found %lld\n",
+                (long long)MDS_SWAP_LAYOUTS);
+       LASSERTF(MDS_LAST_OPC == 62, "found %lld\n",
                 (long long)MDS_LAST_OPC);
        LASSERTF(REINT_SETATTR == 1, "found %lld\n",
                 (long long)REINT_SETATTR);
                 (long long)MDS_LAST_OPC);
        LASSERTF(REINT_SETATTR == 1, "found %lld\n",
                 (long long)REINT_SETATTR);
diff --git a/lustre/tests/racer/file_swap.sh b/lustre/tests/racer/file_swap.sh
new file mode 100755 (executable)
index 0000000..ac3c673
--- /dev/null
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+DIR=$1
+MAX=$2
+
+while : ; do
+       file=$((RANDOM % $MAX))
+       new_file=$((RANDOM % MAX))
+       $LFS swap_layouts $DIR/$file $DIR/$new_file 2>/dev/null
+done
index 3c5d753..d470133 100644 (file)
@@ -9298,6 +9298,115 @@ test_185() { # LU-2441
 }
 run_test 185 "Volatile file support"
 
 }
 run_test 185 "Volatile file support"
 
+check_swap_layouts_support()
+{
+       $LCTL get_param -n llite.*.sbi_flags | grep -q layout ||
+               { skip "Does not support layout lock."; return 0; }
+       return 1
+}
+
+# test suite 184 is for LU-2016, LU-2017
+test_184a() {
+       check_swap_layouts_support && return 0
+
+       dir0=$DIR/$tdir/$testnum
+       test_mkdir -p $dir0 || error "creating dir $dir0"
+       ref1=/etc/passwd
+       ref2=/etc/group
+       file1=$dir0/f1
+       file2=$dir0/f2
+       $SETSTRIPE -c1 $file1
+       cp $ref1 $file1
+       $SETSTRIPE -c2 $file2
+       cp $ref2 $file2
+       gen1=$($GETSTRIPE -g $file1)
+       gen2=$($GETSTRIPE -g $file2)
+
+       $LFS swap_layouts $file1 $file2 || error "swap of file layout failed"
+       gen=$($GETSTRIPE -g $file1)
+       [[ $gen1 != $gen ]] ||
+               "Layout generation on $file1 does not change"
+       gen=$($GETSTRIPE -g $file2)
+       [[ $gen2 != $gen ]] ||
+               "Layout generation on $file2 does not change"
+
+       cmp $ref1 $file2 || error "content compare failed ($ref1 != $file2)"
+       cmp $ref2 $file1 || error "content compare failed ($ref2 != $file1)"
+}
+run_test 184a "Basic layout swap"
+
+test_184b() {
+       check_swap_layouts_support && return 0
+
+       dir0=$DIR/$tdir/$testnum
+       mkdir -p $dir0 || error "creating dir $dir0"
+       file1=$dir0/f1
+       file2=$dir0/f2
+       file3=$dir0/f3
+       dir1=$dir0/d1
+       dir2=$dir0/d2
+       mkdir $dir1 $dir2
+       $SETSTRIPE -c1 $file1
+       $SETSTRIPE -c2 $file2
+       $SETSTRIPE -c1 $file3
+       chown $RUNAS_ID $file3
+       gen1=$($GETSTRIPE -g $file1)
+       gen2=$($GETSTRIPE -g $file2)
+
+       $LFS swap_layouts $dir1 $dir2 &&
+               error "swap of directories layouts should fail"
+       $LFS swap_layouts $dir1 $file1 &&
+               error "swap of directory and file layouts should fail"
+       $RUNAS $LFS swap_layouts $file1 $file2 &&
+               error "swap of file we cannot write should fail"
+       $LFS swap_layouts $file1 $file3 &&
+               error "swap of file with different owner should fail"
+       /bin/true # to clear error code
+}
+run_test 184b "Forbidden layout swap (will generate errors)"
+
+test_184c() {
+       check_swap_layouts_support && return 0
+
+       dir0=$DIR/$tdir/$testnum
+       mkdir -p $dir0 || error "creating dir $dir0"
+       ref1=$dir0/ref1
+       ref2=$dir0/ref2
+       file1=$dir0/file1
+       file2=$dir0/file2
+       # create a file large enough for the concurent test
+       dd if=/dev/urandom of=$ref1 bs=1M count=$((RANDOM % 50 + 20))
+       dd if=/dev/urandom of=$ref2 bs=1M count=$((RANDOM % 50 + 20))
+       echo "ref file size: ref1(`stat -c %s $ref1`), ref2(`stat -c %s $ref2`)"
+
+       cp $ref2 $file2
+       dd if=$ref1 of=$file1 bs=64k &
+       sleep 0.$((RANDOM % 5 + 1))
+
+       $LFS swap_layouts $file1 $file2
+       rc=$?
+       wait $DD_PID
+       [[ $? == 0 ]] || error "concurrent write on $file1 failed"
+       [[ $rc == 0 ]] || error "swap of $file1 and $file2 failed"
+
+       # how many bytes copied before swapping layout
+       local copied=`stat -c %s $file2`
+       local remaining=`stat -c %s $ref1`
+       remaining=$((remaining - copied))
+       echo "Copied $copied bytes before swapping layout..."
+
+       cmp -n $copied $file1 $ref2 ||
+               error "Content mismatch [0, $copied) of ref2 and file1"
+       cmp -n $copied $file2 $ref1 ||
+               error "Content mismatch [0, $copied) of ref1 and file2"
+       cmp -i $copied:$copied -n $remaining $file1 $ref1 ||
+               error "Content mismatch [$copied, EOF) of ref1 and file1"
+
+       # clean up
+       rm -f $ref1 $ref2 $file1 $file2
+}
+run_test 184c "Concurrent write and layout swap"
+
 # OST pools tests
 check_file_in_pool()
 {
 # OST pools tests
 check_file_in_pool()
 {
index e180ffb..0030e63 100644 (file)
@@ -116,7 +116,7 @@ static int lfs_hsm_restore(int argc, char **argv);
 static int lfs_hsm_release(int argc, char **argv);
 static int lfs_hsm_remove(int argc, char **argv);
 static int lfs_hsm_cancel(int argc, char **argv);
 static int lfs_hsm_release(int argc, char **argv);
 static int lfs_hsm_remove(int argc, char **argv);
 static int lfs_hsm_cancel(int argc, char **argv);
-
+static int lfs_swap_layouts(int argc, char **argv);
 
 /* all avaialable commands */
 command_t cmdlist[] = {
 
 /* all avaialable commands */
 command_t cmdlist[] = {
@@ -299,6 +299,8 @@ command_t cmdlist[] = {
        {"hsm_cancel", lfs_hsm_cancel, 0,
         "Cancel requests related to specified files.\n"
         "usage: hsm_cancel [--filelist FILELIST] [--data DATA] <file> ..."},
        {"hsm_cancel", lfs_hsm_cancel, 0,
         "Cancel requests related to specified files.\n"
         "usage: hsm_cancel [--filelist FILELIST] [--data DATA] <file> ..."},
+       {"swap_layouts", lfs_swap_layouts, 0, "Swap layouts between 2 files.\n"
+        "usage: swap_layouts <path1> <path2>"},
         {"help", Parser_help, 0, "help"},
         {"exit", Parser_quit, 0, "quit"},
         {"quit", Parser_quit, 0, "quit"},
         {"help", Parser_help, 0, "help"},
         {"exit", Parser_quit, 0, "quit"},
         {"quit", Parser_quit, 0, "quit"},
@@ -2899,7 +2901,7 @@ static int lfs_hsm_state(int argc, char **argv)
                if (rc) {
                        fprintf(stderr, "can't get hsm state for %s: %s\n",
                                path, strerror(errno = -rc));
                if (rc) {
                        fprintf(stderr, "can't get hsm state for %s: %s\n",
                                path, strerror(errno = -rc));
-                       return rc;
+                       return rc;
                }
 
                /* Display path name and status flags */
                }
 
                /* Display path name and status flags */
@@ -3308,6 +3310,14 @@ static int lfs_hsm_cancel(int argc, char **argv)
        return lfs_hsm_request(argc, argv, HUA_CANCEL);
 }
 
        return lfs_hsm_request(argc, argv, HUA_CANCEL);
 }
 
+static int lfs_swap_layouts(int argc, char **argv)
+{
+       if (argc != 3)
+               return CMD_HELP;
+
+       return llapi_swap_layouts(argv[1], argv[2]);
+}
+
 int main(int argc, char **argv)
 {
         int rc;
 int main(int argc, char **argv)
 {
         int rc;
index f23f11f..4067bd4 100644 (file)
@@ -4138,4 +4138,59 @@ int llapi_create_volatile_idx(char *directory, int idx, int mode)
        return fd;
 }
 
        return fd;
 }
 
+/**
+ * Swap the layouts between 2 file descriptors
+ * the 2 files must be open in write
+ * first fd received the ioctl, second fd is passed as arg
+ * this is assymetric but avoid use of root path for ioctl
+ */
+int llapi_fswap_layouts(int fd1, int fd2)
+{
+       struct lustre_swap_layouts lsl;
+       int rc;
+
+       srandom(time(NULL));
+       lsl.sl_fd = fd2;
+       lsl.sl_flags = 0;
+       lsl.sl_gid = random();
+       rc = ioctl(fd1, LL_IOC_LOV_SWAP_LAYOUTS, &lsl);
+       if (rc)
+               rc = -errno;
+       return rc;
+}
+
+/**
+ * Swap the layouts between 2 files
+ * the 2 files are open in write
+ */
+int llapi_swap_layouts(const char *path1, const char *path2)
+{
+       int     fd1, fd2, rc;
 
 
+       fd1 = open(path1, O_WRONLY);
+       if (fd1 < 0) {
+               llapi_error(LLAPI_MSG_ERROR, -errno,
+                               "error: cannot open for write %s",
+                               path1);
+               return -errno;
+       }
+
+       fd2 = open(path2, O_WRONLY);
+       if (fd2 < 0) {
+               llapi_error(LLAPI_MSG_ERROR, -errno,
+                               "error: cannot open for write %s",
+                               path2);
+               close(fd1);
+               return -errno;
+       }
+
+       rc = llapi_fswap_layouts(fd1, fd2);
+       if (rc < 0)
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                       "error: cannot swap layouts between %s and %s\n",
+                       path1, path2);
+
+       close(fd1);
+       close(fd2);
+       return rc;
+}
index 711eace..8081901 100644 (file)
@@ -82,6 +82,7 @@
 #define lustre_swab_gl_desc NULL
 #define lustre_swab_mgs_config_body NULL
 #define lustre_swab_mgs_config_res NULL
 #define lustre_swab_gl_desc NULL
 #define lustre_swab_mgs_config_body NULL
 #define lustre_swab_mgs_config_res NULL
+#define lustre_swab_swap_layouts NULL
 #define lustre_swab_lu_fid NULL
 #define lustre_swab_hsm_progress_kernel NULL
 #define lustre_swab_hsm_user_item NULL
 #define lustre_swab_lu_fid NULL
 #define lustre_swab_hsm_progress_kernel NULL
 #define lustre_swab_hsm_user_item NULL
index e087797..01689a5 100644 (file)
@@ -2166,6 +2166,7 @@ main(int argc, char **argv)
        CHECK_VALUE(MDS_HSM_REQUEST);
        CHECK_VALUE(MDS_HSM_CT_REGISTER);
        CHECK_VALUE(MDS_HSM_CT_UNREGISTER);
        CHECK_VALUE(MDS_HSM_REQUEST);
        CHECK_VALUE(MDS_HSM_CT_REGISTER);
        CHECK_VALUE(MDS_HSM_CT_UNREGISTER);
+       CHECK_VALUE(MDS_SWAP_LAYOUTS);
        CHECK_VALUE(MDS_LAST_OPC);
 
        CHECK_VALUE(REINT_SETATTR);
        CHECK_VALUE(MDS_LAST_OPC);
 
        CHECK_VALUE(REINT_SETATTR);
index 834aa81..3a3f20e 100644 (file)
@@ -58,12 +58,11 @@ int main()
 
         return ret;
 }
 
         return ret;
 }
-
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux testnode 2.6.32-279.5.1.el6_lustre.g53f705f.x86_64 #1 SMP Mon Oct 8 05:12
+         * running on Linux mercury 2.6.32-279.5.1.el6_lustre.x86_64 #1 SMP Tue Aug 21 00:00:41 PDT 2
          * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
          * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
@@ -187,7 +186,9 @@ void lustre_assert_wire_constants(void)
                 (long long)MDS_HSM_CT_REGISTER);
        LASSERTF(MDS_HSM_CT_UNREGISTER == 60, "found %lld\n",
                 (long long)MDS_HSM_CT_UNREGISTER);
                 (long long)MDS_HSM_CT_REGISTER);
        LASSERTF(MDS_HSM_CT_UNREGISTER == 60, "found %lld\n",
                 (long long)MDS_HSM_CT_UNREGISTER);
-       LASSERTF(MDS_LAST_OPC == 61, "found %lld\n",
+       LASSERTF(MDS_SWAP_LAYOUTS == 61, "found %lld\n",
+                (long long)MDS_SWAP_LAYOUTS);
+       LASSERTF(MDS_LAST_OPC == 62, "found %lld\n",
                 (long long)MDS_LAST_OPC);
        LASSERTF(REINT_SETATTR == 1, "found %lld\n",
                 (long long)REINT_SETATTR);
                 (long long)MDS_LAST_OPC);
        LASSERTF(REINT_SETATTR == 1, "found %lld\n",
                 (long long)REINT_SETATTR);