Whamcloud - gitweb
LU-1876 hsm: layout lock implementation on server side
authorJinshan Xiong <jinshan.xiong@intel.com>
Thu, 6 Dec 2012 17:34:15 +0000 (09:34 -0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 15 Jan 2013 03:40:52 +0000 (22:40 -0500)
* enabled OBD_CONNECT_LAYOUTLOCK both on client and mdt
* mdt_object_lock_try() is implemented to try layout lock
* dlm flag LDLM_FL_BLOCK_NOWAIT is added to implement nonblock mode
  dlm lock enqueue
* try layout lock for IT_GETATTR and IT_OPEN
* make layout lock work
* add layout intent support.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I84a6dfca83ded2344240fa5430c9922b7cfc7bcf
Reviewed-on: http://review.whamcloud.com/4417
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
39 files changed:
lustre/include/cl_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_fid.h
lustre/include/lustre_req_layout.h
lustre/include/obd_support.h
lustre/lclient/glimpse.c
lustre/lclient/lcommon_cl.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/llite_lib.h
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/namei.c
lustre/llite/statahead.c
lustre/llite/vvp_io.c
lustre/llite/vvp_object.c
lustre/lov/lov_lock.c
lustre/lov/lov_object.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lvb.c
lustre/mdt/mdt_open.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/tests/multiop.c
lustre/tests/sanity.sh
lustre/tests/sanityn.sh
lustre/utils/req-layout.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 3fbea9c..e78d05a 100644 (file)
@@ -277,10 +277,25 @@ struct cl_object_conf {
          */
         struct inode             *coc_inode;
        /**
-        * Invalidate the current stripe configuration due to losing
-        * layout lock.
+        * Layout lock handle.
         */
-       bool                      coc_invalidate;
+       struct ldlm_lock         *coc_lock;
+       /**
+        * Operation to handle layout, OBJECT_CONF_XYZ.
+        */
+       int                       coc_opc;
+};
+
+enum {
+       /** configure layout, set up a new stripe, must be called while
+        * holding layout lock. */
+       OBJECT_CONF_SET = 0,
+       /** invalidate the current stripe configuration due to losing
+        * layout lock. */
+       OBJECT_CONF_INVALIDATE = 1,
+       /** wait for old layout to go away so that new layout can be
+        * set up. */
+       OBJECT_CONF_WAIT = 2
 };
 
 /**
index 797d2c9..3c2e9cb 100644 (file)
@@ -1236,7 +1236,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                OBD_CONNECT_64BITHASH | OBD_CONNECT_JOBSTATS | \
                                OBD_CONNECT_EINPROGRESS | \
                                OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \
-                               OBD_CONNECT_LVB_TYPE)
+                               OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
@@ -1251,7 +1251,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_MAX_EASIZE | \
                                OBD_CONNECT_EINPROGRESS | \
                                OBD_CONNECT_JOBSTATS | \
-                               OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE)
+                               OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE|\
+                               OBD_CONNECT_LAYOUTLOCK)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \
                                OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \
@@ -1894,10 +1895,7 @@ extern void lustre_swab_generic_32s (__u32 *val);
 #define MDS_INODELOCK_OPEN   0x000004       /* For opened files */
 #define MDS_INODELOCK_LAYOUT 0x000008       /* for layout */
 
-/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits
- * XXX: MDS_INODELOCK_MAXSHIFT should be increased to 3 once the layout lock is
- * supported */
-#define MDS_INODELOCK_MAXSHIFT 2
+#define MDS_INODELOCK_MAXSHIFT 3
 /* This FULL lock is useful to take on unlink sort of operations */
 #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
 
@@ -3275,6 +3273,25 @@ struct getinfo_fid2path {
 
 void lustre_swab_fid2path (struct getinfo_fid2path *gf);
 
+enum {
+        LAYOUT_INTENT_ACCESS    = 0,
+        LAYOUT_INTENT_READ      = 1,
+        LAYOUT_INTENT_WRITE     = 2,
+        LAYOUT_INTENT_GLIMPSE   = 3,
+        LAYOUT_INTENT_TRUNC     = 4,
+        LAYOUT_INTENT_RELEASE   = 5,
+        LAYOUT_INTENT_RESTORE   = 6
+};
+
+/* enqueue layout lock with intent */
+struct layout_intent {
+       __u32 li_opc; /* intent operation for enqueue, read, write etc */
+       __u32 li_flags;
+       __u64 li_start;
+       __u64 li_end;
+};
+
+void lustre_swab_layout_intent(struct layout_intent *li);
 
 #endif
 /** @} lustreidl */
index 08b6e7d..5ad5f9b 100644 (file)
@@ -90,6 +90,7 @@ typedef enum {
         ELDLM_LOCK_ABORTED = 301,
         ELDLM_LOCK_REPLACED = 302,
         ELDLM_NO_LOCK_DATA = 303,
+        ELDLM_LOCK_WOULDBLOCK = 304,
 
         ELDLM_NAMESPACE_EXISTS = 400,
         ELDLM_BAD_NAMESPACE    = 401
@@ -1153,6 +1154,12 @@ struct ldlm_resource {
        struct inode            *lr_lvb_inode;
 };
 
+static inline bool ldlm_has_layout(struct ldlm_lock *lock)
+{
+       return lock->l_resource->lr_type == LDLM_IBITS &&
+               lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT;
+}
+
 static inline char *
 ldlm_ns_name(struct ldlm_namespace *ns)
 {
index 6376a9e..60595cd 100644 (file)
@@ -330,6 +330,11 @@ static inline int imp_connect_lru_resize(struct obd_import *imp)
         return !!(ocd->ocd_connect_flags & OBD_CONNECT_LRU_RESIZE);
 }
 
+static inline int exp_connect_layout(struct obd_export *exp)
+{
+       return !!(exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK);
+}
+
 static inline bool exp_connect_lvb_type(struct obd_export *exp)
 {
        LASSERT(exp != NULL);
index 99b88be..bd045cb 100644 (file)
@@ -498,6 +498,16 @@ static inline int fid_res_name_eq(const struct lu_fid *f,
                name->name[LUSTRE_RES_ID_VER_OID_OFF] == fid_ver_oid(f);
 }
 
+/* reverse function of fid_build_reg_res_name() */
+static inline void fid_build_from_res_name(struct lu_fid *f,
+                                          const struct ldlm_res_id *name)
+{
+       fid_zero(f);
+       f->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF];
+       f->f_oid = name->name[LUSTRE_RES_ID_VER_OID_OFF] & 0xffffffff;
+       f->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32;
+       LASSERT(fid_res_name_eq(f, name));
+}
 
 static inline struct ldlm_res_id *
 fid_build_pdo_res_name(const struct lu_fid *f,
index 2d09357..697f736 100644 (file)
@@ -211,6 +211,7 @@ extern struct req_format RQF_LDLM_ENQUEUE_LVB;
 extern struct req_format RQF_LDLM_CONVERT;
 extern struct req_format RQF_LDLM_INTENT;
 extern struct req_format RQF_LDLM_INTENT_BASIC;
+extern struct req_format RQF_LDLM_INTENT_LAYOUT;
 extern struct req_format RQF_LDLM_INTENT_GETATTR;
 extern struct req_format RQF_LDLM_INTENT_OPEN;
 extern struct req_format RQF_LDLM_INTENT_CREATE;
@@ -257,6 +258,7 @@ extern struct req_msg_field RMF_DLM_REP;
 extern struct req_msg_field RMF_DLM_LVB;
 extern struct req_msg_field RMF_DLM_GL_DESC;
 extern struct req_msg_field RMF_LDLM_INTENT;
+extern struct req_msg_field RMF_LAYOUT_INTENT;
 extern struct req_msg_field RMF_MDT_MD;
 extern struct req_msg_field RMF_REC_REINT;
 extern struct req_msg_field RMF_EADATA;
index 12ea449..c99152d 100644 (file)
@@ -234,6 +234,11 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_OSC_CREATE_FAIL     0x147
 #define OBD_FAIL_MDS_NEGATIVE_POSITIVE  0x148
 
+/* layout lock */
+#define OBD_FAIL_MDS_NO_LL_GETATTR      0x170
+#define OBD_FAIL_MDS_NO_LL_OPEN                 0x171
+#define OBD_FAIL_MDS_LL_BLOCK           0x172
+
 /* CMD */
 #define OBD_FAIL_MDS_IS_SUBDIR_NET       0x180
 #define OBD_FAIL_MDS_IS_SUBDIR_PACK      0x181
@@ -446,6 +451,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE             0x1401
 #define OBD_FAIL_LOCK_STATE_WAIT_INTR               0x1402
 #define OBD_FAIL_LOV_INIT                          0x1403
+#define OBD_FAIL_GLIMPSE_DELAY                     0x1404
 
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   CFS_FAIL_PRECHECK(id)
index 5d91c75..c4b44de 100644 (file)
@@ -237,6 +237,8 @@ int cl_glimpse_size0(struct inode *inode, int agl)
                 else if (result == 0)
                         result = cl_glimpse_lock(env, io, inode, io->ci_obj,
                                                  agl);
+
+               OBD_FAIL_TIMEOUT(OBD_FAIL_GLIMPSE_DELAY, 2);
                 cl_io_fini(env, io);
                if (unlikely(io->ci_need_restart))
                        goto again;
index 5104717..0ad8f1e 100644 (file)
@@ -1193,15 +1193,13 @@ int cl_file_inode_init(struct inode *inode, struct lustre_md *md)
                         /*
                          * No locking is necessary, as new inode is
                          * locked by I_NEW bit.
-                         *
-                         * XXX not true for call from ll_update_inode().
                          */
                         lli->lli_clob = clob;
+                       lli->lli_has_smd = md->lsm != NULL;
                         lu_object_ref_add(&clob->co_lu, "inode", inode);
                 } else
                         result = PTR_ERR(clob);
-        } else
-                result = cl_conf_set(env, lli->lli_clob, &conf);
+        }
         cl_env_put(env, &refcheck);
 
         if (result != 0)
index e10a654..b68dd59 100644 (file)
@@ -190,8 +190,12 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
         LASSERT(cfs_list_empty(&res->lr_converting));
         check_res_locked(res);
 
-        if (!first_enq) {
-                LASSERT(work_list != NULL);
+       /* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */
+        if (!first_enq || (*flags & LDLM_FL_BLOCK_NOWAIT)) {
+               *err = ELDLM_LOCK_ABORTED;
+               if (*flags & LDLM_FL_BLOCK_NOWAIT)
+                       *err = ELDLM_LOCK_WOULDBLOCK;
+
                 rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
@@ -201,6 +205,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
 
                 ldlm_resource_unlink_lock(lock);
                 ldlm_grant_lock(lock, work_list);
+
+               *err = ELDLM_OK;
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
index debc6cf..747217b 100644 (file)
@@ -881,6 +881,20 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 
         ldlm_lock_decref_internal_nolock(lock, mode);
 
+       /* release lvb data for layout lock */
+       if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers &&
+           ldlm_has_layout(lock) && lock->l_flags & LDLM_FL_LVB_READY) {
+               /* this is the last user of a layout lock and stripe has
+                * been set up, lvb is no longer used.
+                * This may be a large amount of memory, so we should free it
+                * when possible. */
+               if (lock->l_lvb_data != NULL) {
+                       OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
+                       lock->l_lvb_data = NULL;
+                       lock->l_lvb_len = 0;
+               }
+       }
+
         if (lock->l_flags & LDLM_FL_LOCAL &&
             !lock->l_readers && !lock->l_writers) {
                 /* If this is a local lock on a server namespace and this was
@@ -1567,7 +1581,8 @@ int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
                memcpy(data, lvb, size);
                break;
        default:
-               LDLM_ERROR(lock, "Unexpected LVB type");
+               LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
+               libcfs_debug_dumpstack(NULL);
                RETURN(-EINVAL);
        }
 
index 5517ed5..464a19a 100644 (file)
@@ -1411,9 +1411,9 @@ existing_lock:
                            "(err=%d, rc=%d)", err, rc);
 
                 if (rc == 0) {
-                       int lvb_len = ldlm_lvbo_size(lock);
-
-                       if (lvb_len > 0) {
+                       if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+                                                 RCL_SERVER) &&
+                           ldlm_lvbo_size(lock) > 0) {
                                void *buf;
                                int buflen;
 
@@ -1735,12 +1735,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                           lock->l_lvb_len, lvb_len);
                                GOTO(out, rc = -EINVAL);
                        }
-               } else { /* for layout lock, lvb has variable length */
+               } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
+                                                    * variable length */
                        void *lvb_data;
 
                        OBD_ALLOC(lvb_data, lvb_len);
                        if (lvb_data == NULL) {
-                               LDLM_ERROR(lock, "No memory.\n");
+                               LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
                                GOTO(out, rc = -ENOMEM);
                        }
 
@@ -1829,6 +1830,7 @@ out:
                lock_res_and_lock(lock);
                lock->l_flags |= LDLM_FL_FAILED;
                unlock_res_and_lock(lock);
+               cfs_waitq_signal(&lock->l_waitq);
        }
        LDLM_LOCK_RELEASE(lock);
 }
index 3ad511a..494661e 100644 (file)
@@ -808,6 +808,8 @@ static int ldlm_resource_complain(cfs_hash_t *hs, cfs_hash_bd_t *bd,
                res->lr_name.name[0], res->lr_name.name[1],
                res->lr_name.name[2], res->lr_name.name[3],
                cfs_atomic_read(&res->lr_refcount) - 1);
+
+       ldlm_resource_dump(D_ERROR, res);
         return 0;
 }
 
index f8e6e34..88b69b9 100644 (file)
@@ -117,6 +117,8 @@ struct llu_inode_info {
          * was opened several times without close, we track an
          * open_count here */
         struct ll_file_data    *lli_file_data;
+       /* checking lli_has_smd is reliable only inside an IO
+        * i.e, lov stripe has been held. */
        bool                    lli_has_smd;
         int                     lli_open_flags;
         int                     lli_open_count;
index 892626a..55a1654 100644 (file)
@@ -259,26 +259,14 @@ int ll_dops_init(struct dentry *de, int block, int init_sa)
 
 void ll_intent_drop_lock(struct lookup_intent *it)
 {
-        struct lustre_handle *handle;
-
         if (it->it_op && it->d.lustre.it_lock_mode) {
-               struct ldlm_lock *lock;
-
-               handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
-               lock = ldlm_handle2lock(handle);
-               if (lock != NULL) {
-                       /* it can only be allowed to match after layout is
-                        * applied to inode otherwise false layout would be
-                        * seen. Applying layout shoud happen before dropping
-                        * the intent lock. */
-                       if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT)
-                               ldlm_lock_allow_match(lock);
-                       LDLM_LOCK_PUT(lock);
-               }
+               struct lustre_handle handle;
+
+               handle.cookie = it->d.lustre.it_lock_handle;
 
                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
-                       " from it %p\n", handle->cookie, it);
-                ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
+                       " from it %p\n", handle.cookie, it);
+                ldlm_lock_decref(&handle, it->d.lustre.it_lock_mode);
 
                 /* bug 494: intent_release may be called multiple times, from
                  * this thread and we don't want to double-decref this lock */
@@ -351,7 +339,7 @@ int ll_revalidate_it_finish(struct ptlrpc_request *request,
         if (it_disposition(it, DISP_LOOKUP_NEG))
                 RETURN(-ENOENT);
 
-        rc = ll_prep_inode(&de->d_inode, request, NULL);
+        rc = ll_prep_inode(&de->d_inode, request, NULL, it);
 
         RETURN(rc);
 }
index 2fbeb61..b540065 100644 (file)
@@ -411,7 +411,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                 GOTO(out, rc);
         }
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
         if (!rc && itp->d.lustre.it_lock_mode)
                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
                                  itp, NULL);
@@ -1486,7 +1486,11 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
         if (rc == 0) {
                struct lov_stripe_md *lsm;
+               __u32 gen;
+
                put_user(0, &lumv1p->lmm_stripe_count);
+
+               ll_layout_refresh(inode, &gen);
                lsm = ccc_inode_lsm_get(inode);
                rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
                                   0, lsm, (void *)arg);
@@ -2501,7 +2505,7 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, NULL);
+                rc = ll_prep_inode(&inode, req, NULL, NULL);
         }
 out:
         ptlrpc_req_finished(req);
@@ -2909,10 +2913,126 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
 
        result = cl_conf_set(env, lli->lli_clob, conf);
        cl_env_nested_put(&nest, env);
+
+       if (conf->coc_opc == OBJECT_CONF_SET) {
+               struct ldlm_lock *lock = conf->coc_lock;
+
+               LASSERT(lock != NULL);
+               LASSERT(ldlm_has_layout(lock));
+               if (result == 0) {
+                       /* it can only be allowed to match after layout is
+                        * applied to inode otherwise false layout would be
+                        * seen. Applying layout shoud happen before dropping
+                        * the intent lock. */
+                       ldlm_lock_allow_match(lock);
+               }
+       }
        RETURN(result);
 }
 
 /**
+ * Apply the layout to the inode. Layout lock is held and will be released
+ * in this function.
+ */
+static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
+                               struct inode *inode, __u32 *gen, bool reconf)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_sb_info    *sbi = ll_i2sbi(inode);
+       struct ldlm_lock *lock;
+       struct lustre_md md = { NULL };
+       struct cl_object_conf conf;
+       int rc = 0;
+       bool lvb_ready;
+       ENTRY;
+
+       LASSERT(lustre_handle_is_used(lockh));
+
+       lock = ldlm_handle2lock(lockh);
+       LASSERT(lock != NULL);
+       LASSERT(ldlm_has_layout(lock));
+
+       LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
+               inode, PFID(&lli->lli_fid), reconf);
+
+       lock_res_and_lock(lock);
+       lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
+       unlock_res_and_lock(lock);
+       /* checking lvb_ready is racy but this is okay. The worst case is
+        * that multi processes may configure the file on the same time. */
+       if (lvb_ready || !reconf) {
+               LDLM_LOCK_PUT(lock);
+
+               rc = -ENODATA;
+               if (lvb_ready) {
+                       /* layout_gen must be valid if layout lock is not
+                        * cancelled and stripe has already set */
+                       *gen = lli->lli_layout_gen;
+                       rc = 0;
+               }
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* for layout lock, lmm is returned in lock's lvb.
+        * lvb_data is immutable if the lock is held so it's safe to access it
+        * without res lock. See the description in ldlm_lock_decref_internal()
+        * for the condition to free lvb_data of layout lock */
+       if (lock->l_lvb_data != NULL) {
+               rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+                                 lock->l_lvb_data, lock->l_lvb_len);
+               if (rc >= 0) {
+                       if (md.lsm != NULL)
+                               *gen = md.lsm->lsm_layout_gen;
+                       rc = 0;
+               } else {
+                       CERROR("%s: file "DFID" unpackmd error: %d\n",
+                               ll_get_fsname(inode->i_sb, NULL, 0),
+                               PFID(&lli->lli_fid), rc);
+               }
+       }
+       if (rc < 0) {
+               LDLM_LOCK_PUT(lock);
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* set layout to file. Unlikely this will fail as old layout was
+        * surely eliminated */
+       memset(&conf, 0, sizeof conf);
+       conf.coc_opc = OBJECT_CONF_SET;
+       conf.coc_inode = inode;
+       conf.coc_lock = lock;
+       conf.u.coc_md = &md;
+       rc = ll_layout_conf(inode, &conf);
+       LDLM_LOCK_PUT(lock);
+
+       ldlm_lock_decref(lockh, mode);
+
+       if (md.lsm != NULL)
+               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+       /* wait for IO to complete if it's still being used. */
+       if (rc == -EBUSY) {
+               CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0),
+                       inode, PFID(&lli->lli_fid));
+
+               memset(&conf, 0, sizeof conf);
+               conf.coc_opc = OBJECT_CONF_WAIT;
+               conf.coc_inode = inode;
+               rc = ll_layout_conf(inode, &conf);
+               if (rc == 0)
+                       rc = -EAGAIN;
+
+               CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
+                       PFID(&lli->lli_fid), rc);
+       }
+
+       RETURN(rc);
+}
+
+/**
  * This function checks if there exists a LAYOUT lock on the client side,
  * or enqueues it if it doesn't have one in cache.
  *
@@ -2929,9 +3049,9 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
-       struct md_op_data     *op_data = NULL;
-       struct lookup_intent   it = { .it_op = IT_LAYOUT };
-       struct lustre_handle   lockh = { 0 };
+       struct md_op_data     *op_data;
+       struct lookup_intent   it;
+       struct lustre_handle   lockh;
        ldlm_mode_t            mode;
        struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
                                           .ei_mode = LCK_CR,
@@ -2941,7 +3061,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
-       *gen = 0;
+       *gen = LL_LAYOUT_GEN_ZERO;
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
@@ -2951,92 +3071,67 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               /* lsm_layout_gen is started from 0, plus 1 here to distinguish
-                * the cases of no layout and first layout. */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
+               if (rc == 0)
+                       RETURN(0);
 
-               ldlm_lock_decref(&lockh, mode);
-               RETURN(0);
+               /* better hold lli_layout_mutex to try again otherwise
+                * it will have starvation problem. */
        }
 
-       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
-                                    0, 0, LUSTRE_OPC_ANY, NULL);
-       if (IS_ERR(op_data))
-               RETURN(PTR_ERR(op_data));
-
        /* take layout lock mutex to enqueue layout lock exclusively. */
        mutex_lock(&lli->lli_layout_mutex);
 
-       /* try again inside layout mutex */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+again:
+       /* try again. Maybe somebody else has done this. */
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
 
-               ldlm_lock_decref(&lockh, mode);
                mutex_unlock(&lli->lli_layout_mutex);
-               ll_finish_md_op_data(op_data);
-               RETURN(0);
+               RETURN(rc);
+       }
+
+       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
+                       0, 0, LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               mutex_unlock(&lli->lli_layout_mutex);
+               RETURN(PTR_ERR(op_data));
        }
 
        /* have to enqueue one */
+       memset(&it, 0, sizeof(it));
+       it.it_op = IT_LAYOUT;
+       lockh.cookie = 0ULL;
+
+       LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0), inode,
+                       PFID(&lli->lli_fid));
+
        rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
                        NULL, 0, NULL, 0);
        if (it.d.lustre.it_data != NULL)
                ptlrpc_req_finished(it.d.lustre.it_data);
        it.d.lustre.it_data = NULL;
 
-       if (rc == 0) {
-               struct ldlm_lock *lock;
-               struct cl_object_conf conf;
-               struct lustre_md md = { NULL };
-               void *lmm;
-               int lmmsize;
+       ll_finish_md_op_data(op_data);
 
-               LASSERT(lustre_handle_is_used(&lockh));
+       mode = it.d.lustre.it_lock_mode;
+       it.d.lustre.it_lock_mode = 0;
+       ll_intent_drop_lock(&it);
 
+       if (rc == 0) {
                /* set lock data in case this is a new lock */
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-
-               lock = ldlm_handle2lock(&lockh);
-               LASSERT(lock != NULL);
-
-               /* for IT_LAYOUT lock, lmm is returned in lock's lvb
-                * data via completion callback */
-               lmm = lock->l_lvb_data;
-               lmmsize = lock->l_lvb_len;
-               if (lmm != NULL) {
-                       rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
-                                       lmm, lmmsize);
-                       if (rc >= 0) {
-                               if (md.lsm != NULL)
-                                       *gen = md.lsm->lsm_layout_gen + 1;
-                               rc = 0;
-                       } else {
-                               CERROR("file: "DFID" unpackmd error: %d\n",
-                                       PFID(&lli->lli_fid), rc);
-                       }
-               }
-               LDLM_LOCK_PUT(lock);
-
-               /* set layout to file. This may cause lock expiration as we
-                * set layout inside layout ibits lock. */
-               memset(&conf, 0, sizeof conf);
-               conf.coc_inode = inode;
-               conf.u.coc_md = &md;
-               ll_layout_conf(inode, &conf);
-               /* is this racy? */
-               lli->lli_has_smd = md.lsm != NULL;
-               if (md.lsm != NULL)
-                       obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
        }
-       ll_intent_drop_lock(&it);
-
        mutex_unlock(&lli->lli_layout_mutex);
-       ll_finish_md_op_data(op_data);
 
        RETURN(rc);
 }
index a2d0e22..90c5790 100644 (file)
@@ -832,7 +832,7 @@ int ll_show_options(struct seq_file *seq, struct vfsmount *vfs);
 #endif
 void ll_dirty_page_discard_warn(cfs_page_t *page, int ioret);
 int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
-                 struct super_block *);
+                 struct super_block *, struct lookup_intent *);
 void lustre_dump_dentry(struct dentry *, int recur);
 void lustre_dump_inode(struct inode *);
 int ll_obd_statfs(struct inode *inode, void *arg);
@@ -1573,6 +1573,7 @@ struct if_quotactl_18 {
 #warning "remove old LL_IOC_QUOTACTL_18 compatibility code"
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) */
 
+#define LL_LAYOUT_GEN_ZERO     ((__u32)-1)
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 
index 38bd420..cf7c2c2 100644 (file)
@@ -221,7 +221,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR    |
                                   OBD_CONNECT_FULL20   | OBD_CONNECT_64BITHASH|
                                  OBD_CONNECT_EINPROGRESS |
-                                 OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE;
+                                 OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
+                                 OBD_CONNECT_LAYOUTLOCK;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
@@ -404,7 +405,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH |
                                   OBD_CONNECT_MAXBYTES |
                                  OBD_CONNECT_EINPROGRESS |
-                                 OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE;
+                                 OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
+                                 OBD_CONNECT_LAYOUTLOCK;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
@@ -923,6 +925,7 @@ void ll_lli_init(struct ll_inode_info *lli)
        mutex_init(&lli->lli_och_mutex);
        spin_lock_init(&lli->lli_agl_lock);
        lli->lli_has_smd = false;
+       lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
        lli->lli_clob = NULL;
 
        LASSERT(lli->lli_vfs_inode.i_mode != 0);
@@ -1701,21 +1704,9 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
 
        LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
        if (lsm != NULL) {
-               LASSERT(S_ISREG(inode->i_mode));
-               CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
-                               lsm, inode->i_ino, inode->i_generation, inode);
-               /* cl_file_inode_init must go before lli_has_smd or a race
-                * is possible where client thinks the file has stripes,
-                * but lov raid0 is not setup yet and parallel e.g.
-                * glimpse would try to use uninitialized lov */
-               if (cl_file_inode_init(inode, md) == 0)
-                       lli->lli_has_smd = true;
-
                lli->lli_maxbytes = lsm->lsm_maxbytes;
                if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
                        lli->lli_maxbytes = MAX_LFS_FILESIZE;
-               if (md->lsm != NULL)
-                       obd_free_memmd(ll_i2dtexp(inode), &md->lsm);
        }
 
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
@@ -2134,13 +2125,11 @@ int ll_remount_fs(struct super_block *sb, int *flags, char *data)
         return 0;
 }
 
-int ll_prep_inode(struct inode **inode,
-                  struct ptlrpc_request *req,
-                  struct super_block *sb)
+int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
+                 struct super_block *sb, struct lookup_intent *it)
 {
        struct ll_sb_info *sbi = NULL;
        struct lustre_md md;
-       __u64 ibits;
         int rc;
         ENTRY;
 
@@ -2164,8 +2153,6 @@ int ll_prep_inode(struct inode **inode,
 
                 *inode = ll_iget(sb, cl_fid_build_ino(&md.body->fid1, 0), &md);
                 if (*inode == NULL || IS_ERR(*inode)) {
-                        if (md.lsm)
-                                obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
 #ifdef CONFIG_FS_POSIX_ACL
                         if (md.posix_acl) {
                                 posix_acl_release(md.posix_acl);
@@ -2179,16 +2166,37 @@ int ll_prep_inode(struct inode **inode,
                 }
         }
 
-       /* sanity check for LAYOUT lock. */
-       ibits = MDS_INODELOCK_LAYOUT;
-       if (S_ISREG(md.body->mode) && sbi->ll_flags & LL_SBI_LAYOUT_LOCK &&
-           md.lsm != NULL && !ll_have_md_lock(*inode, &ibits, LCK_MINMODE)) {
-               CERROR("%s: inode "DFID" (%p) layout lock not granted.\n",
-                       ll_get_fsname(sb, NULL, 0),
-                       PFID(ll_inode2fid(*inode)), *inode);
+       /* Handling piggyback layout lock.
+        * Layout lock can be piggybacked by getattr and open request.
+        * The lsm can be applied to inode only if it comes with a layout lock
+        * otherwise correct layout may be overwritten, for example:
+        * 1. proc1: mdt returns a lsm but not granting layout
+        * 2. layout was changed by another client
+        * 3. proc2: refresh layout and layout lock granted
+        * 4. proc1: to apply a stale layout */
+       if (it != NULL && it->d.lustre.it_lock_mode != 0) {
+               struct lustre_handle lockh;
+               struct ldlm_lock *lock;
+
+               lockh.cookie = it->d.lustre.it_lock_handle;
+               lock = ldlm_handle2lock(&lockh);
+               LASSERT(lock != NULL);
+               if (ldlm_has_layout(lock)) {
+                       struct cl_object_conf conf;
+
+                       memset(&conf, 0, sizeof(conf));
+                       conf.coc_opc = OBJECT_CONF_SET;
+                       conf.coc_inode = *inode;
+                       conf.coc_lock = lock;
+                       conf.u.coc_md = &md;
+                       (void)ll_layout_conf(*inode, &conf);
+               }
+               LDLM_LOCK_PUT(lock);
        }
 
 out:
+       if (md.lsm != NULL)
+               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
        md_free_lustre_md(sbi->ll_md_exp, &md);
        RETURN(rc);
 }
index 414f935..eeb24a5 100644 (file)
@@ -106,7 +106,7 @@ static struct inode *search_inode_for_lustre(struct super_block *sb,
                        PFID(fid), rc);
                 RETURN(ERR_PTR(rc));
         }
-        rc = ll_prep_inode(&inode, req, sb);
+        rc = ll_prep_inode(&inode, req, sb, NULL);
         ptlrpc_req_finished(req);
         if (rc)
                 RETURN(ERR_PTR(rc));
index 1d39dfd..3622909 100644 (file)
@@ -150,10 +150,14 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash,
 
                         ll_read_inode2(inode, md);
                         if (S_ISREG(inode->i_mode) &&
-                            ll_i2info(inode)->lli_clob == NULL)
+                            ll_i2info(inode)->lli_clob == NULL) {
+                               CDEBUG(D_INODE,
+                                       "%s: apply lsm %p to inode "DFID".\n",
+                                       ll_get_fsname(sb, NULL, 0), md->lsm,
+                                       PFID(ll_inode2fid(inode)));
                                 rc = cl_file_inode_init(inode, md);
+                       }
                         if (rc != 0) {
-                                md->lsm = NULL;
                                 make_bad_inode(inode);
                                 unlock_new_inode(inode);
                                 iput(inode);
@@ -258,8 +262,10 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                lli = ll_i2info(inode);
                if (bits & MDS_INODELOCK_LAYOUT) {
-                       struct cl_object_conf conf = { .coc_inode = inode,
-                                                      .coc_invalidate = true };
+                       struct cl_object_conf conf = { { 0 } };
+
+                       conf.coc_opc = OBJECT_CONF_INVALIDATE;
+                       conf.coc_inode = inode;
                        rc = ll_layout_conf(inode, &conf);
                        if (rc)
                                CDEBUG(D_INODE, "invaliding layout %d.\n", rc);
@@ -427,7 +433,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request,
        CDEBUG(D_DENTRY, "it %p it_disposition %x\n", it,
               it->d.lustre.it_disposition);
        if (!it_disposition(it, DISP_LOOKUP_NEG)) {
-                rc = ll_prep_inode(&inode, request, (*de)->d_sb);
+                rc = ll_prep_inode(&inode, request, (*de)->d_sb, it);
                 if (rc)
                         RETURN(rc);
 
@@ -794,7 +800,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
         LASSERT(it_disposition(it, DISP_ENQ_CREATE_REF));
         request = it->d.lustre.it_data;
         it_clear_disposition(it, DISP_ENQ_CREATE_REF);
-        rc = ll_prep_inode(&inode, request, dir->i_sb);
+        rc = ll_prep_inode(&inode, request, dir->i_sb, it);
         if (rc)
                 GOTO(out, inode = ERR_PTR(rc));
 
@@ -898,7 +904,7 @@ static int ll_new_node(struct inode *dir, struct qstr *name,
         ll_update_times(request, dir);
 
         if (dchild) {
-                err = ll_prep_inode(&inode, request, dchild->d_sb);
+                err = ll_prep_inode(&inode, request, dchild->d_sb, NULL);
                 if (err)
                      GOTO(err_exit, err);
 
index 98c40e3..b066a23 100644 (file)
@@ -692,7 +692,7 @@ static void do_statahead_interpret(struct ll_statahead_info *sai,
         if (rc != 1)
                 GOTO(out, rc = -EAGAIN);
 
-        rc = ll_prep_inode(&child, req, dir->i_sb);
+        rc = ll_prep_inode(&child, req, dir->i_sb, it);
         if (rc)
                 GOTO(out, rc);
 
index 3fd1fb8..401c4ad 100644 (file)
@@ -1180,6 +1180,12 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
                        io->ci_lockreq = CILR_MANDATORY;
        }
 
+       /* ignore layout change for generic CIT_MISC but not for glimpse.
+        * io context for glimpse must set ci_verify_layout to true,
+        * see cl_glimpse_size0() for details. */
+       if (io->ci_type == CIT_MISC && !io->ci_verify_layout)
+               io->ci_ignore_layout = 1;
+
        /* Enqueue layout lock and get layout version. We need to do this
         * even for operations requiring to open file, such as read and write,
         * because it might not grant layout lock in IT_OPEN. */
index f112744..20dcdf9 100644 (file)
@@ -128,9 +128,23 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
 {
        struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
 
-       if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
+       if (conf->coc_opc != OBJECT_CONF_SET)
+               return 0;
+
+       if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) {
+               CDEBUG(D_VFSTRACE, "layout lock change: %u -> %u\n",
+                       lli->lli_layout_gen,
+                       conf->u.coc_md->lsm->lsm_layout_gen);
+
+               lli->lli_has_smd = true;
                lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
+       } else {
+               CDEBUG(D_VFSTRACE, "layout lock destroyed: %u.\n",
+                       lli->lli_layout_gen);
 
+               lli->lli_has_smd = false;
+               lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
+       }
        return 0;
 }
 
index 85958ec..1241f5f 100644 (file)
@@ -1205,6 +1205,7 @@ static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
        return 0;
 }
 
+/* XXX: more methods will be added later. */
 static const struct cl_lock_operations lov_empty_lock_ops = {
        .clo_fini  = lov_empty_lock_fini,
        .clo_print = lov_empty_lock_print
index 88124bf..2908bc0 100644 (file)
@@ -133,6 +133,12 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
         int result;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
+               /* For sanity:test_206.
+                * Do not leave the object in cache to avoid accessing
+                * freed memory. This is because osc_object is referring to
+                * lov_oinfo of lsm_stripe_data which will be freed due to
+                * this failure. */
+               cl_object_kill(env, stripe);
                cl_object_put(env, stripe);
                return -EIO;
        }
@@ -541,8 +547,13 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
                RETURN(0);
 
        LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
-       while (cfs_atomic_read(&lsm->lsm_refc) > 1) {
+       while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) {
                lov_conf_unlock(lov);
+
+               CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
+                       PFID(lu_object_fid(lov2lu(lov))),
+                       cfs_atomic_read(&lsm->lsm_refc));
+
                l_wait_event(lov->lo_waitq,
                             cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
                lov_conf_lock(lov);
@@ -642,23 +653,34 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        ENTRY;
 
        lov_conf_lock(lov);
-       if (conf->coc_invalidate) {
+       if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
                lov->lo_lsm_invalid = 1;
                GOTO(out, result = 0);
        }
 
+       if (conf->coc_opc == OBJECT_CONF_WAIT) {
+               result = lov_layout_wait(env, lov);
+               GOTO(out, result);
+       }
+
+       LASSERT(conf->coc_opc == OBJECT_CONF_SET);
+
        if (conf->u.coc_md != NULL)
                lsm = conf->u.coc_md->lsm;
-
        if ((lsm == NULL && lov->lo_lsm == NULL) ||
            (lsm != NULL && lov->lo_lsm != NULL &&
             lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+               /* same version of layout */
                lov->lo_lsm_invalid = 0;
                GOTO(out, result = 0);
        }
 
-       /* will change layout */
-       lov_layout_wait(env, lov);
+       /* will change layout - check if there still exists active IO. */
+       if (lov->lo_lsm != NULL &&
+           cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) {
+               lov->lo_lsm_invalid = 1;
+               GOTO(out, result = -EBUSY);
+       }
 
        /*
         * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
index 93d9f4b..c17feed 100644 (file)
@@ -437,6 +437,45 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
         RETURN(req);
 }
 
+static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
+                                                    struct lookup_intent *it,
+                                                    struct md_op_data *unused)
+{
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct ptlrpc_request *req;
+       struct ldlm_intent    *lit;
+       struct layout_intent  *layout;
+       int rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                               &RQF_LDLM_INTENT_LAYOUT);
+       if (req == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
+       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(ERR_PTR(rc));
+       }
+
+       /* pack the intent */
+       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+       lit->opc = (__u64)it->it_op;
+
+       /* pack the layout intent request */
+       layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
+       /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
+        * set for replication */
+       layout->li_opc = LAYOUT_INTENT_ACCESS;
+
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                       obd->u.cli.cl_max_mds_easize);
+       ptlrpc_request_set_replen(req);
+       RETURN(req);
+}
+
 static struct ptlrpc_request *
 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
 {
@@ -470,6 +509,9 @@ static int mdc_finish_enqueue(struct obd_export *exp,
        struct ldlm_request *lockreq;
        struct ldlm_reply   *lockrep;
        struct lustre_intent_data *intent = &it->d.lustre;
+       struct ldlm_lock    *lock;
+       void                *lvb_data = NULL;
+       int                  lvb_len = 0;
         ENTRY;
 
         LASSERT(rc >= 0);
@@ -485,8 +527,8 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
         } else { /* rc = 0 */
-                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
-                LASSERT(lock);
+               lock = ldlm_handle2lock(lockh);
+               LASSERT(lock != NULL);
 
                 /* If the server gave us back a different lock mode, we should
                  * fix up our variables. */
@@ -547,12 +589,10 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         mdc_set_open_replay_data(NULL, NULL, req);
                }
 
-               /* TODO: make sure LAYOUT lock must be granted along with EA */
-
                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
                         void *eadata;
 
-                         mdc_update_max_ea_from_body(exp, body);
+                       mdc_update_max_ea_from_body(exp, body);
 
                         /*
                          * The eadata is opaque; just check that it is there.
@@ -563,6 +603,11 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         if (eadata == NULL)
                                 RETURN(-EPROTO);
 
+                       /* save lvb data and length in case this is for layout
+                        * lock */
+                       lvb_data = eadata;
+                       lvb_len = body->eadatasize;
+
                         /*
                          * We save the reply LOV EA in case we have to replay a
                          * create for recovery.  If we didn't allocate a large
@@ -624,44 +669,45 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                                 RETURN(-EPROTO);
                 }
         } else if (it->it_op & IT_LAYOUT) {
-                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+               /* maybe the lock was granted right away and layout
+                * is packed into RMF_DLM_LVB of req */
+               lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+               if (lvb_len > 0) {
+                       lvb_data = req_capsule_server_sized_get(pill,
+                                                       &RMF_DLM_LVB, lvb_len);
+                       if (lvb_data == NULL)
+                               RETURN(-EPROTO);
+               }
+       }
 
-               if (lock != NULL && lock->l_lvb_data == NULL) {
-                       int lvb_len;
+       /* fill in stripe data for layout lock */
+       lock = ldlm_handle2lock(lockh);
+       if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
+               void *lmm;
 
-                       /* maybe the lock was granted right away and layout
-                        * is packed into RMF_DLM_LVB of req */
-                       lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
-                                                      RCL_SERVER);
-                       if (lvb_len > 0) {
-                               void *lvb;
-                               void *lmm;
+               LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
+                       ldlm_it2str(it->it_op), lvb_len);
 
-                               lvb = req_capsule_server_sized_get(pill,
-                                                       &RMF_DLM_LVB, lvb_len);
-                               if (lvb == NULL) {
-                                       LDLM_LOCK_PUT(lock);
-                                       RETURN(-EPROTO);
-                               }
-
-                               OBD_ALLOC_LARGE(lmm, lvb_len);
-                               if (lmm == NULL) {
-                                       LDLM_LOCK_PUT(lock);
-                                       RETURN(-ENOMEM);
-                               }
-                               memcpy(lmm, lvb, lvb_len);
-
-                               /* install lvb_data */
-                               lock_res_and_lock(lock);
-                               LASSERT(lock->l_lvb_data == NULL);
-                               lock->l_lvb_data = lmm;
-                               lock->l_lvb_len = lvb_len;
-                               unlock_res_and_lock(lock);
-                       }
-               }
-               if (lock != NULL)
+               OBD_ALLOC_LARGE(lmm, lvb_len);
+               if (lmm == NULL) {
                        LDLM_LOCK_PUT(lock);
+                       RETURN(-ENOMEM);
+               }
+               memcpy(lmm, lvb_data, lvb_len);
+
+               /* install lvb_data */
+               lock_res_and_lock(lock);
+               if (lock->l_lvb_data == NULL) {
+                       lock->l_lvb_data = lmm;
+                       lock->l_lvb_len = lvb_len;
+                       lmm = NULL;
+               }
+               unlock_res_and_lock(lock);
+               if (lmm != NULL)
+                       OBD_FREE_LARGE(lmm, lvb_len);
        }
+       if (lock != NULL)
+               LDLM_LOCK_PUT(lock);
 
        RETURN(rc);
 }
@@ -732,7 +778,7 @@ resend:
                if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
                        RETURN(-EOPNOTSUPP);
 
-               req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
+               req = mdc_intent_layout_pack(exp, it, op_data);
                lvb_type = LVB_T_LAYOUT;
        } else {
                 LBUG();
@@ -810,9 +856,15 @@ resend:
                 }
         }
 
-        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
-
-        RETURN(rc);
+       rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+       if (rc < 0) {
+               if (lustre_handle_is_used(lockh)) {
+                       ldlm_lock_decref(lockh, einfo->ei_mode);
+                       memset(lockh, 0, sizeof(*lockh));
+               }
+               ptlrpc_req_finished(req);
+       }
+       RETURN(rc);
 }
 
 static int mdc_finish_intent_lock(struct obd_export *exp,
@@ -1022,7 +1074,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
 
         lockh.cookie = 0;
         if (fid_is_sane(&op_data->op_fid2) &&
-            (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
+            (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
index a44945c..4e266be 100644 (file)
@@ -1246,13 +1246,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 LDLM_LOCK_PUT(lock);
                 rc = 0;
         } else {
+               bool try_layout = false;
+
 relock:
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
-                if (child_bits == MDS_INODELOCK_LAYOUT)
-                        mdt_lock_reg_init(lhc, LCK_CR);
-                else
-                        mdt_lock_reg_init(lhc, LCK_PR);
+               mdt_lock_reg_init(lhc, LCK_PR);
 
                 if (mdt_object_exists(child) == 0) {
                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
@@ -1270,12 +1269,6 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
-                        /* layout lock is used only on regular files */
-                        if ((ma->ma_valid & MA_INODE) &&
-                            (ma->ma_attr.la_valid & LA_MODE) &&
-                            !S_ISREG(ma->ma_attr.la_mode))
-                                child_bits &= ~MDS_INODELOCK_LAYOUT;
-
                         /* If the file has not been changed for some time, we
                          * return not only a LOOKUP lock, but also an UPDATE
                          * lock and this might save us RPC on later STAT. For
@@ -1288,9 +1281,35 @@ relock:
                                 child_bits |= MDS_INODELOCK_UPDATE;
                 }
 
-                rc = mdt_object_lock(info, child, lhc, child_bits,
-                                     MDT_CROSS_LOCK);
+               /* layout lock must be granted in a best-effort way
+                * for IT operations */
+               LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
+               if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+                   exp_connect_layout(info->mti_exp) &&
+                   S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) &&
+                   ldlm_rep != NULL) {
+                       /* try to grant layout lock for regular file. */
+                       try_layout = true;
+               }
 
+               rc = 0;
+               if (try_layout) {
+                       child_bits |= MDS_INODELOCK_LAYOUT;
+                       /* try layout lock, it may fail to be granted due to
+                        * contention at LOOKUP or UPDATE */
+                       if (!mdt_object_lock_try(info, child, lhc, child_bits,
+                                                MDT_CROSS_LOCK)) {
+                               child_bits &= ~MDS_INODELOCK_LAYOUT;
+                               LASSERT(child_bits != 0);
+                               rc = mdt_object_lock(info, child, lhc,
+                                               child_bits, MDT_CROSS_LOCK);
+                       } else {
+                               ma_need |= MA_LOV;
+                       }
+               } else {
+                       rc = mdt_object_lock(info, child, lhc, child_bits,
+                                               MDT_CROSS_LOCK);
+               }
                 if (unlikely(rc != 0))
                         GOTO(out_child, rc);
         }
@@ -1300,7 +1319,7 @@ relock:
         if (lock &&
             lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
             S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
-                ma_need = MA_SOM;
+                ma_need |= MA_SOM;
 
         /* finally, we can get attr for child. */
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
@@ -2131,6 +2150,7 @@ int mdt_llog_prev_block(struct mdt_thread_info *info)
 /*
  * DLM handlers.
  */
+
 static struct ldlm_callback_suite cbs = {
        .lcs_completion = ldlm_server_completion_ast,
        .lcs_blocking   = ldlm_server_blocking_ast,
@@ -2362,12 +2382,14 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(rc);
 }
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                    struct mdt_lock_handle *lh, __u64 ibits, int locality)
+static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
+                           struct mdt_lock_handle *lh, __u64 ibits,
+                           bool nonblock, int locality)
 {
         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         ldlm_policy_data_t *policy = &info->mti_policy;
         struct ldlm_res_id *res_id = &info->mti_res_id;
+       __u64 dlmflags;
         int rc;
         ENTRY;
 
@@ -2404,6 +2426,10 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         memset(policy, 0, sizeof(*policy));
         fid_build_reg_res_name(mdt_object_fid(o), res_id);
 
+       dlmflags = LDLM_FL_ATOMIC_CB;
+       if (nonblock)
+               dlmflags |= LDLM_FL_BLOCK_NOWAIT;
+
         /*
          * Take PDO lock on whole directory and build correct @res_id for lock
          * on part of directory.
@@ -2419,7 +2445,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                          */
                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                          policy, res_id, LDLM_FL_ATOMIC_CB,
+                                          policy, res_id, dlmflags,
                                           &info->mti_exp->exp_handle.h_cookie);
                         if (unlikely(rc))
                                 RETURN(rc);
@@ -2440,7 +2466,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
          * fix it up and turn FL_LOCAL flag off.
          */
         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
                           &info->mti_exp->exp_handle.h_cookie);
         if (rc)
                 mdt_object_unlock(info, o, lh, 1);
@@ -2453,6 +2479,25 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         RETURN(rc);
 }
 
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                   struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+       return mdt_object_lock0(info, o, lh, ibits, false, locality);
+}
+
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+                       struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+       struct mdt_lock_handle tmp = *lh;
+       int rc;
+
+       rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+       if (rc == 0)
+               *lh = tmp;
+
+       return rc == 0;
+}
+
 /**
  * Save a lock within request object.
  *
@@ -3215,6 +3260,10 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                               struct mdt_thread_info *info,
                               struct ldlm_lock **,
                              __u64);
+static int mdt_intent_layout(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **,
+                            __u64);
 static int mdt_intent_reint(enum mdt_it_code opcode,
                             struct mdt_thread_info *info,
                             struct ldlm_lock **,
@@ -3279,11 +3328,11 @@ static struct mdt_it_flavor {
                 .it_flags = 0,
                 .it_act   = NULL
         },
-        [MDT_IT_LAYOUT] = {
-                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
-                .it_flags = HABEO_REFERO,
-                .it_act   = mdt_intent_getattr
-        }
+       [MDT_IT_LAYOUT] = {
+               .it_fmt   = &RQF_LDLM_INTENT_LAYOUT,
+               .it_flags = 0,
+               .it_act   = mdt_intent_layout
+       }
 };
 
 int mdt_intent_lock_replace(struct mdt_thread_info *info,
@@ -3460,16 +3509,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         case MDT_IT_GETATTR:
                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
-        case MDT_IT_LAYOUT: {
-                static int printed = 0;
-
-                if (!printed) {
-                        CERROR("layout lock not supported by this version\n");
-                        printed = 1;
-                }
-                GOTO(out_shrink, rc = -EINVAL);
-                break;
-        }
         default:
                 CERROR("Unsupported intent (%d)\n", opcode);
                 GOTO(out_shrink, rc = -EINVAL);
@@ -3509,6 +3548,39 @@ out_shrink:
         return rc;
 }
 
+static int mdt_intent_layout(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **lockp,
+                            __u64 flags)
+{
+       struct layout_intent *layout;
+       int rc;
+       ENTRY;
+
+       if (opcode != MDT_IT_LAYOUT) {
+               CERROR("%s: Unknown intent (%d)\n",
+                       info->mti_exp->exp_obd->obd_name, opcode);
+               RETURN(-EINVAL);
+       }
+
+       (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+       req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+                       ldlm_lvbo_size(*lockp));
+       rc = req_capsule_server_pack(info->mti_pill);
+       if (rc != 0)
+               RETURN(-EINVAL);
+
+       layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+       LASSERT(layout != NULL);
+       if (layout->li_opc == LAYOUT_INTENT_ACCESS)
+               /* return to normal ldlm handling */
+               RETURN(0);
+
+       CERROR("%s: Unsupported layout intent (%d)\n",
+               info->mti_exp->exp_obd->obd_name, layout->li_opc);
+       RETURN(-EINVAL);
+}
+
 static int mdt_intent_reint(enum mdt_it_code opcode,
                             struct mdt_thread_info *info,
                             struct ldlm_lock **lockp,
@@ -3585,7 +3657,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                   */
                 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                         LASSERTF(rc == 0, "Error occurred but lock handle "
-                                 "is still in use\n");
+                                 "is still in use, rc = %d\n", rc);
                         rep->lock_policy_res2 = 0;
                         rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
                         RETURN(rc);
@@ -3672,14 +3744,6 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
                RETURN(rc);
        }
 
-       if (opc == MDT_IT_LAYOUT) {
-               (*lockp)->l_lvb_type = LVB_T_LAYOUT;
-               /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT
-                *      lock enabled. */
-       } else if (opc == MDT_IT_READDIR) {
-               req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
-       }
-
        flv  = &mdt_it_flavor[opc];
         if (flv->it_fmt != NULL)
                 req_capsule_extend(pill, flv->it_fmt);
index 262ac79..1f6f78d 100644 (file)
@@ -245,6 +245,7 @@ enum {
         MDT_LH_PARENT, /* parent lockh */
         MDT_LH_CHILD,  /* child lockh */
         MDT_LH_OLD,    /* old lockh for rename */
+       MDT_LH_LAYOUT = MDT_LH_OLD, /* layout lock */
         MDT_LH_NEW,    /* new lockh for rename */
         MDT_LH_RMT,    /* used for return lh to caller */
         MDT_LH_NR
@@ -569,6 +570,11 @@ int mdt_object_lock(struct mdt_thread_info *,
                     struct mdt_lock_handle *,
                     __u64, int);
 
+int mdt_object_lock_try(struct mdt_thread_info *,
+                       struct mdt_object *,
+                       struct mdt_lock_handle *,
+                       __u64, int);
+
 void mdt_object_unlock(struct mdt_thread_info *,
                        struct mdt_object *,
                        struct mdt_lock_handle *,
index 91df890..40a525e 100644 (file)
@@ -72,10 +72,13 @@ static int mdt_lvbo_update(struct ldlm_resource *res,
 
 static int mdt_lvbo_size(struct ldlm_lock *lock)
 {
-       if (IS_LQUOTA_RES(lock->l_resource)) {
-               struct mdt_device       *mdt;
+       struct mdt_device *mdt;
+
+       /* resource on server side never changes. */
+       mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+       LASSERT(mdt != NULL);
 
-               mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+       if (IS_LQUOTA_RES(lock->l_resource)) {
                if (mdt->mdt_qmt_dev == NULL)
                        return 0;
 
@@ -83,24 +86,93 @@ static int mdt_lvbo_size(struct ldlm_lock *lock)
                return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
        }
 
+       if (ldlm_has_layout(lock))
+               return mdt->mdt_max_mdsize;
+
        return 0;
 }
 
 static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
 {
+       struct lu_env env;
+       struct mdt_thread_info *info;
+       struct mdt_device *mdt;
+       struct lu_fid *fid;
+       struct mdt_object *obj = NULL;
+       struct md_object *child = NULL;
+       int rc;
+       ENTRY;
+
+       mdt = ldlm_lock_to_ns(lock)->ns_lvbp;
        if (IS_LQUOTA_RES(lock->l_resource)) {
-               struct mdt_device       *mdt;
-
-               mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
                if (mdt->mdt_qmt_dev == NULL)
-                       return 0;
+                       RETURN(0);
 
                /* call lvbo fill function of quota master */
-               return qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
-                                              lvblen);
+               rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
+                                            lvblen);
+               RETURN(rc);
        }
 
-       return 0;
+       if (lock->l_resource->lr_type != LDLM_IBITS ||
+               !(lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT))
+               RETURN(0);
+
+       /* layout lock will be granted to client, fill in lvb with layout */
+
+       /* XXX create an env to talk to mdt stack. We should get this env from
+        * ptlrpc_thread->t_env. */
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       LASSERT(rc == 0);
+
+       info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+       LASSERT(info != NULL);
+       memset(info, 0, sizeof *info);
+       info->mti_env = &env;
+       info->mti_exp = lock->l_export;
+       info->mti_mdt = mdt;
+
+       /* XXX get fid by resource id. why don't include fid in ldlm_resource */
+       fid = &info->mti_tmp_fid2;
+       fid_build_from_res_name(fid, &lock->l_resource->lr_name);
+
+       obj = mdt_object_find(&env, info->mti_mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       if (mdt_object_exists(obj) <= 0)
+               GOTO(out, rc = -ENOENT);
+
+       child = mdt_object_child(obj);
+
+       /* get the length of lsm */
+       rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (rc > 0) {
+               struct lu_buf *lmm = NULL;
+
+               if (lvblen < rc) {
+                       CERROR("%s: expected %d actual %d.\n",
+                               info->mti_exp->exp_obd->obd_name, rc, lvblen);
+                       GOTO(out, rc = -ERANGE);
+               }
+
+               lmm = &info->mti_buf;
+               lmm->lb_buf = lvb;
+               lmm->lb_len = rc;
+
+               rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
+
+out:
+       if (obj != NULL && !IS_ERR(obj))
+               mdt_object_put(&env, obj);
+       lu_env_fini(&env);
+       RETURN(rc < 0 ? 0 : rc);
 }
 
 static int mdt_lvbo_free(struct ldlm_resource *res)
index 4fc48c7..f370e5e 100644 (file)
@@ -1115,6 +1115,115 @@ int mdt_open_by_fid(struct mdt_thread_info* info,
         RETURN(rc);
 }
 
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+                               struct mdt_object *obj,
+                               struct mdt_lock_handle *lhc,
+                               __u64 *ibits)
+{
+       struct md_attr *ma = &info->mti_attr;
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       ldlm_mode_t lm = LCK_CR;
+       bool try_layout = false;
+       bool create_layout = false;
+       int rc = 0;
+       ENTRY;
+
+       *ibits = 0;
+       if (open_flags & MDS_OPEN_LOCK) {
+               if (open_flags & FMODE_WRITE)
+                       lm = LCK_CW;
+               else if (open_flags & MDS_FMODE_EXEC)
+                       lm = LCK_PR;
+               else
+                       lm = LCK_CR;
+
+               *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+       }
+
+       if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) {
+               if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+                   md_should_create(open_flags))
+                       create_layout = true;
+               if (exp_connect_layout(info->mti_exp) && !create_layout &&
+                   ma->ma_need & MA_LOV)
+                       try_layout = true;
+       }
+
+       mdt_lock_handle_init(lhc);
+       mdt_lock_reg_init(lhc, lm);
+
+       /* one problem to return layout lock on open is that it may result
+        * in too many layout locks cached on the client side. */
+       if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+               /* return lookup lock to validate inode at the client side,
+                * this is pretty important otherwise mdt will return layout
+                * lock for each open.
+                * However this is a double-edged sword because changing
+                * permission will revoke huge # of LOOKUP locks. */
+               *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+               if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+                                        MDT_CROSS_LOCK)) {
+                       *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+                       if (*ibits != 0)
+                               rc = mdt_object_lock(info, obj, lhc, *ibits,
+                                               MDT_CROSS_LOCK);
+               }
+       } else if (*ibits != 0) {
+               rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+       }
+
+       CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+               ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+               PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+       /* will change layout, revoke layout locks by enqueuing EX lock. */
+       if (rc == 0 && create_layout) {
+               struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+               CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+                       ", open_flags = "LPO64"\n",
+                       PFID(mdt_object_fid(obj)), open_flags);
+
+               LASSERT(!try_layout);
+               mdt_lock_handle_init(ll);
+               mdt_lock_reg_init(ll, LCK_EX);
+               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+                                       MDT_LOCAL_LOCK);
+
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+       }
+
+       RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+                                  struct mdt_object *obj,
+                                  struct mdt_lock_handle *lhc,
+                                  __u64 ibits, int rc)
+{
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+       /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
+        * will never return to client side. */
+       if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+               LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+               mdt_object_unlock(info, obj, ll, 1);
+       }
+
+       if (ibits == 0)
+               return;
+
+       if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+               /* for the open request, the lock will only return to client
+                * if open or layout lock is granted. */
+               rc = 1;
+       }
+       if (rc != 0)
+               mdt_object_unlock(info, obj, lhc, 1);
+}
+
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                         struct mdt_lock_handle *lhc)
 {
@@ -1126,7 +1235,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         struct mdt_object       *parent= NULL;
         struct mdt_object       *o;
         int                      rc;
-        ldlm_mode_t              lm;
+       __u64                    ibits;
         ENTRY;
 
        if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
@@ -1160,22 +1269,11 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                        DISP_LOOKUP_EXECD));
 
-        if (flags & FMODE_WRITE)
-                lm = LCK_CW;
-        else if (flags & MDS_FMODE_EXEC)
-                lm = LCK_PR;
-        else
-                lm = LCK_CR;
-
-        mdt_lock_handle_init(lhc);
-        mdt_lock_reg_init(lhc, lm);
-        rc = mdt_object_lock(info, o, lhc,
-                             MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                             MDT_CROSS_LOCK);
+       rc = mdt_attr_get_complex(info, o, ma);
         if (rc)
                 GOTO(out, rc);
 
-       rc = mdt_attr_get_complex(info, o, ma);
+       rc = mdt_object_open_lock(info, o, lhc, &ibits);
         if (rc)
                 GOTO(out, rc);
 
@@ -1191,18 +1289,15 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         }
 
         rc = mdt_finish_open(info, parent, o, flags, 0, rep);
-
-        if (!(flags & MDS_OPEN_LOCK) || rc)
-                mdt_object_unlock(info, o, lhc, 1);
-
        if (!rc) {
                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
                if (flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
        }
-
         GOTO(out, rc);
+
 out:
+       mdt_object_open_unlock(info, o, lhc, ibits, rc);
         mdt_object_put(env, o);
         if (parent != NULL)
                 mdt_object_put(env, parent);
@@ -1275,6 +1370,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
         struct md_attr          *ma = &info->mti_attr;
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
+       __u64                    ibits;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct lu_name          *lname;
         int                      result, rc;
@@ -1513,41 +1609,24 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
 
-        /* get openlock if this is not replay and if a client requested it */
-        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
-                ldlm_mode_t lm;
-
-                if (create_flags & FMODE_WRITE)
-                        lm = LCK_CW;
-                else if (create_flags & MDS_FMODE_EXEC)
-                        lm = LCK_PR;
-                else
-                        lm = LCK_CR;
-                mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, lm);
-                rc = mdt_object_lock(info, child, lhc,
-                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                                     MDT_CROSS_LOCK);
-                if (rc) {
-                        result = rc;
-                        GOTO(out_child, result);
-                } else {
-                        result = -EREMOTE;
-                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-                }
-        }
+       /* get openlock if this is not replay and if a client requested it */
+       if (!req_is_replay(req)) {
+               rc = mdt_object_open_lock(info, child, lhc, &ibits);
+               if (rc != 0) {
+                       GOTO(out_child, result = rc);
+               } else if (create_flags & MDS_OPEN_LOCK) {
+                       result = -EREMOTE;
+                       mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+               }
+       }
 
         /* Try to open it now. */
         rc = mdt_finish_open(info, parent, child, create_flags,
                              created, ldlm_rep);
         if (rc) {
                 result = rc;
-               if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                        /* openlock was acquired and mdt_finish_open failed -
-                           drop the openlock */
-                        mdt_object_unlock(info, child, lhc, 1);
-                       mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-               }
+               /* openlock will be released if mdt_finish_open failed */
+               mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
                 if (created) {
                         ma->ma_need = 0;
                         ma->ma_valid = 0;
@@ -1564,6 +1643,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         }
         EXIT;
 out_child:
+       mdt_object_open_unlock(info, child, lhc, ibits,
+                               result == -EREMOTE ? 0 : result);
         mdt_object_put(info->mti_env, child);
 out_parent:
         mdt_object_unlock_put(info, parent, lh, result || !created);
index deff55f..f696827 100644 (file)
@@ -383,6 +383,13 @@ static const struct req_msg_field *ldlm_intent_server[] = {
         &RMF_ACL
 };
 
+static const struct req_msg_field *ldlm_intent_layout_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_DLM_REQ,
+       &RMF_LDLM_INTENT,
+       &RMF_LAYOUT_INTENT,
+       &RMF_EADATA /* for new layout to be set up */
+};
 static const struct req_msg_field *ldlm_intent_open_server[] = {
         &RMF_PTLRPC_BODY,
         &RMF_DLM_REP,
@@ -642,6 +649,7 @@ static struct req_format *req_formats[] = {
        &RQF_LDLM_GL_DESC_CALLBACK,
         &RQF_LDLM_INTENT,
        &RQF_LDLM_INTENT_BASIC,
+        &RQF_LDLM_INTENT_LAYOUT,
         &RQF_LDLM_INTENT_GETATTR,
         &RQF_LDLM_INTENT_OPEN,
         &RQF_LDLM_INTENT_CREATE,
@@ -935,6 +943,12 @@ struct req_msg_field RMF_CAPA2 =
                     lustre_swab_lustre_capa, NULL);
 EXPORT_SYMBOL(RMF_CAPA2);
 
+struct req_msg_field RMF_LAYOUT_INTENT =
+       DEFINE_MSGF("layout_intent", 0,
+                   sizeof(struct layout_intent), lustre_swab_layout_intent,
+                   NULL);
+EXPORT_SYMBOL(RMF_LAYOUT_INTENT);
+
 /*
  * OST request field.
  */
@@ -1228,6 +1242,11 @@ struct req_format RQF_LDLM_INTENT =
                         ldlm_intent_client, ldlm_intent_server);
 EXPORT_SYMBOL(RQF_LDLM_INTENT);
 
+struct req_format RQF_LDLM_INTENT_LAYOUT =
+       DEFINE_REQ_FMT0("LDLM_INTENT_LAYOUT ",
+                       ldlm_intent_layout_client, ldlm_enqueue_lvb_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_LAYOUT);
+
 struct req_format RQF_LDLM_INTENT_GETATTR =
         DEFINE_REQ_FMT0("LDLM_INTENT_GETATTR",
                         ldlm_intent_getattr_client, ldlm_intent_getattr_server);
@@ -1377,7 +1396,6 @@ struct req_format RQF_OST_GET_INFO_FIEMAP =
                                                ost_get_fiemap_server);
 EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP);
 
-
 #if !defined(__REQ_LAYOUT_USER__)
 
 /* Convenience macro */
index 3f06381..bb961d3 100644 (file)
@@ -2514,4 +2514,11 @@ void lustre_swab_hsm_progress(struct hsm_progress *hp)
 }
 EXPORT_SYMBOL(lustre_swab_hsm_progress);
 
-
+void lustre_swab_layout_intent(struct layout_intent *li)
+{
+       __swab32s(&li->li_opc);
+       __swab32s(&li->li_flags);
+       __swab64s(&li->li_start);
+       __swab64s(&li->li_end);
+}
+EXPORT_SYMBOL(lustre_swab_layout_intent);
index 3269fea..1201e46 100644 (file)
@@ -4161,5 +4161,39 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location));
         LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n",
                  (long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location));
+
+        /* Checks for struct layout_intent */
+        LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n",
+                 (long long)(int)sizeof(struct layout_intent));
+        LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_opc));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_opc));
+        LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_flags));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_flags));
+        LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_start));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_start));
+        LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_end));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_end));
+        LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n",
+                 (long long)LAYOUT_INTENT_ACCESS);
+        LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n",
+                 (long long)LAYOUT_INTENT_READ);
+        LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n",
+                 (long long)LAYOUT_INTENT_WRITE);
+        LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n",
+                 (long long)LAYOUT_INTENT_GLIMPSE);
+        LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n",
+                 (long long)LAYOUT_INTENT_TRUNC);
+        LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n",
+                 (long long)LAYOUT_INTENT_RELEASE);
+        LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n",
+                 (long long)LAYOUT_INTENT_RESTORE);
 }
 
index 4d6fece..ab360c4 100644 (file)
@@ -50,6 +50,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <semaphore.h>
+#include <time.h>
 
 #include <lustre/lustreapi.h>
 
@@ -198,6 +199,7 @@ int main(int argc, char **argv)
         int save_errno;
         int verbose = 0;
         int gid = 0;
+       struct timespec ts;
 
         if (argc < 3) {
                 fprintf(stderr, usage, argv[0]);
@@ -219,7 +221,12 @@ int main(int argc, char **argv)
                                 printf("PAUSING\n");
                                 fflush(stdout);
                         }
-                        while (sem_wait(&sem) == -1 && errno == EINTR);
+                       len = atoi(commands+1);
+                       if (len <= 0)
+                               len = 3600; /* 1 hour */
+                       ts.tv_sec = time(NULL) + len;
+                       ts.tv_nsec = 0;
+                        while (sem_timedwait(&sem, &ts) < 0 && errno == EINTR);
                         break;
                 case 'c':
                         if (close(fd) == -1) {
index 739b693..495570f 100644 (file)
@@ -9776,6 +9776,41 @@ test_206() {
 }
 run_test 206 "fail lov_init_raid0() doesn't lbug"
 
+test_207a() {
+       dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1))
+       local fsz=`stat -c %s $DIR/$tfile`
+       cancel_lru_locks mdc
+
+       # do not return layout in getattr intent
+#define OBD_FAIL_MDS_NO_LL_GETATTR 0x170
+       $LCTL set_param fail_loc=0x170
+       local sz=`stat -c %s $DIR/$tfile`
+
+       [ $fsz -eq $sz ] || error "file size expected $fsz, actual $sz"
+
+       rm -rf $DIR/$tfile
+}
+run_test 207a "can refresh layout at glimpse"
+
+test_207b() {
+       dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1))
+       local cksum=`md5sum $DIR/$tfile`
+       local fsz=`stat -c %s $DIR/$tfile`
+       cancel_lru_locks mdc
+       cancel_lru_locks osc
+
+       # do not return layout in getattr intent
+#define OBD_FAIL_MDS_NO_LL_OPEN 0x171
+       $LCTL set_param fail_loc=0x171
+
+       # it will refresh layout after the file is opened but before read issues
+       echo checksum is "$cksum"
+       echo "$cksum" |md5sum -c --quiet || error "file differs"
+
+       rm -rf $DIR/$tfile
+}
+run_test 207b "can refresh layout at open"
+
 test_212() {
        size=`date +%s`
        size=$((size % 8192 + 1))
index d79f038..3a84dd4 100644 (file)
@@ -2258,6 +2258,81 @@ test_50() {
 }
 run_test 50 "osc lvb attrs: enqueue vs. CP AST =============="
 
+test_51a() {
+       local filesize
+       local origfile=/etc/hosts
+
+       filesize=`stat -c %s $origfile`
+
+       # create an empty file
+       $MCREATE $DIR1/$tfile
+       # cache layout lock on both mount point
+       stat $DIR1/$tfile > /dev/null
+       stat $DIR2/$tfile > /dev/null
+
+       # open and sleep 2 seconds then read
+       $MULTIOP $DIR2/$tfile o_2r${filesize}c &
+       local pid=$!
+       sleep 0.1
+
+       # create the layout of testing file
+       dd if=$origfile of=$DIR1/$tfile conv=notrunc > /dev/null
+
+       # MULTIOP proc should be able to read enough bytes and exit
+       sleep 2
+       kill -0 $pid && error "multiop is still there"
+       cmp $origfile $DIR2/$tfile || error "$MCREATE and $DIR2/$tfile differs"
+
+       rm -f $DIR1/$tfile
+}
+run_test 51a "layout lock: refresh layout should work"
+
+test_51b() {
+       local tmpfile=`mktemp`
+
+       # create an empty file
+       $MCREATE $DIR1/$tfile
+
+       # delay glimpse so that layout has changed when glimpse finish
+#define OBD_FAIL_GLIMPSE_DELAY 0x1404
+       $LCTL set_param fail_loc=0x1404
+       stat -c %s $DIR2/$tfile |tee $tmpfile &
+       local pid=$!
+       sleep 0.1
+
+       # create layout of testing file
+       dd if=/dev/zero of=$DIR1/$tfile bs=1k count=1 conv=notrunc > /dev/null
+
+       wait $pid
+       local fsize=`cat $tmpfile`
+
+       [ x$fsize = x1024 ] || error "file size is $fsize, should be 1024"
+
+       rm -f $DIR1/$tfile $tmpfile
+}
+run_test 51b "layout lock: glimpse should be able to restart if layout changed"
+
+test_51c() {
+       # create an empty file
+       $MCREATE $DIR1/$tfile
+
+#define OBD_FAIL_MDS_LL_BLOCK 0x172
+       $LCTL set_param fail_loc=0x172
+
+       # change the layout of testing file
+       echo "Setting layout ..."
+       $LFS setstripe -c $OSTCOUNT $DIR1/$tfile &
+       pid=$!
+       sleep 0.1
+
+       # get layout of this file should wait until dd is finished
+       local stripecnt=`$LFS getstripe -c $DIR2/$tfile`
+       [ $stripecnt -eq $OSTCOUNT ] || error "layout wrong"
+
+       rm -f $DIR1/$tfile
+}
+run_test 51c "layout lock: IT_LAYOUT blocked and correct layout can be returned"
+
 test_60() {
        [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.3.0) ]] ||
        { skip "Need MDS version at least 2.3.0"; return; }
index e68fdab..cc09b13 100644 (file)
@@ -59,6 +59,7 @@
 #define lustre_swab_ldlm_request NULL
 #define lustre_swab_ldlm_reply NULL
 #define lustre_swab_ldlm_intent NULL
+#define lustre_swab_layout_intent NULL
 /* #define lustre_swab_lov_mds_md NULL */
 #define lustre_swab_mdt_rec_reint NULL
 #define lustre_swab_lustre_capa NULL
index 7ed0777..0970513 100644 (file)
@@ -1880,6 +1880,24 @@ check_hsm_user_state(void)
         CHECK_MEMBER(hsm_user_state, hus_in_progress_location);
 }
 
+static void check_layout_intent(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(layout_intent);
+        CHECK_MEMBER(layout_intent, li_opc);
+        CHECK_MEMBER(layout_intent, li_flags);
+        CHECK_MEMBER(layout_intent, li_start);
+        CHECK_MEMBER(layout_intent, li_end);
+
+       CHECK_VALUE(LAYOUT_INTENT_ACCESS);
+       CHECK_VALUE(LAYOUT_INTENT_READ);
+       CHECK_VALUE(LAYOUT_INTENT_WRITE);
+       CHECK_VALUE(LAYOUT_INTENT_GLIMPSE);
+       CHECK_VALUE(LAYOUT_INTENT_TRUNC);
+       CHECK_VALUE(LAYOUT_INTENT_RELEASE);
+       CHECK_VALUE(LAYOUT_INTENT_RESTORE);
+}
+
 static void
 system_string (char *cmdline, char *str, int len)
 {
@@ -2234,6 +2252,7 @@ main(int argc, char **argv)
         check_hsm_user_item();
         check_hsm_user_request();
         check_hsm_user_state();
+       check_layout_intent();
 
         printf("}\n\n");
 
index b2a95cb..da171e3 100644 (file)
@@ -4169,5 +4169,39 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location));
         LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n",
                  (long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location));
+
+        /* Checks for struct layout_intent */
+        LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n",
+                 (long long)(int)sizeof(struct layout_intent));
+        LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_opc));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_opc));
+        LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_flags));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_flags));
+        LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_start));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_start));
+        LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n",
+                 (long long)(int)offsetof(struct layout_intent, li_end));
+        LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct layout_intent *)0)->li_end));
+        LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n",
+                 (long long)LAYOUT_INTENT_ACCESS);
+        LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n",
+                 (long long)LAYOUT_INTENT_READ);
+        LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n",
+                 (long long)LAYOUT_INTENT_WRITE);
+        LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n",
+                 (long long)LAYOUT_INTENT_GLIMPSE);
+        LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n",
+                 (long long)LAYOUT_INTENT_TRUNC);
+        LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n",
+                 (long long)LAYOUT_INTENT_RELEASE);
+        LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n",
+                 (long long)LAYOUT_INTENT_RESTORE);
 }