Whamcloud - gitweb
LU-169 lov: add basic infrastructure for layout lock
authorJohann Lombardi <johann@whamcloud.com>
Tue, 10 Jan 2012 16:41:26 +0000 (17:41 +0100)
committerOleg Drokin <green@whamcloud.com>
Tue, 24 Jan 2012 23:24:26 +0000 (18:24 -0500)
This patch adds some basic infrastructure to support the layout lock
in a near future. This includes defining a new inode lock bit to lock
the file layout (namely MDS_INODELOCK_LAYOUT) as well as a new lookup
intent (IT_LAYOUT).

Signed-off-by: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Change-Id: Ibf1c3c166b5def4654684febbcf3a99ea7e482eb
Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1854
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
15 files changed:
lustre/include/liblustre.h
lustre/include/lustre/lustre_idl.h
lustre/include/obd.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/llite_lib.h
lustre/llite/llite_lib.c
lustre/llite/rw26.c
lustre/lmv/lmv_intent.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_handler.c
lustre/obdclass/obdo.c
lustre/obdfilter/filter.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index a791d35..1048874 100644 (file)
@@ -421,7 +421,7 @@ typedef struct file_lock {
 #define i_atime                     i_stbuf.st_atime
 #define i_mtime                     i_stbuf.st_mtime
 #define i_ctime                     i_stbuf.st_ctime
-#define i_size                      i_stbuf.st_size
+/* use i_size_read() i_size_write() to access i_stbuf.st_size */
 #define i_blocks                    i_stbuf.st_blocks
 #define i_blksize                   i_stbuf.st_blksize
 #define i_mode                      i_stbuf.st_mode
index 4645d30..a9e5968 100644 (file)
@@ -1138,7 +1138,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
                                 LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \
                                 OBD_CONNECT_CHANGE_QS | \
-                                OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_RMT_CLIENT | \
+                                OBD_CONNECT_OSS_CAPA  | \
+                                OBD_CONNECT_RMT_CLIENT | \
                                 OBD_CONNECT_RMT_CLIENT_FORCE | OBD_CONNECT_VBR | \
                                 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
                                 OBD_CONNECT_GRANT_SHRINK | OBD_CONNECT_FULL20 | \
@@ -1553,8 +1554,11 @@ extern void lustre_swab_generic_32s (__u32 *val);
 #define MDS_INODELOCK_LOOKUP 0x000001       /* dentry, mode, owner, group */
 #define MDS_INODELOCK_UPDATE 0x000002       /* size, links, timestamps */
 #define MDS_INODELOCK_OPEN   0x000004       /* For opened files */
+#define MDS_INODELOCK_LAYOUT 0x000008       /* for layout */
 
-/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits */
+/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits
+ * XXX: MDS_INODELOCK_MAXSHIFT should be increased to 3 once the layout lock is
+ * supported */
 #define MDS_INODELOCK_MAXSHIFT 2
 /* This FULL lock is useful to take on unlink sort of operations */
 #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
index 965fd22..8f9d603 100644 (file)
@@ -1246,13 +1246,15 @@ struct lu_context;
 #define IT_GETXATTR (1 << 7)
 #define IT_EXEC     (1 << 8)
 #define IT_PIN      (1 << 9)
+#define IT_LAYOUT   (1 << 10)
 
 static inline int it_to_lock_mode(struct lookup_intent *it)
 {
         /* CREAT needs to be tested before open (both could be set) */
         if (it->it_op & IT_CREAT)
                 return LCK_CW;
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP |
+                              IT_LAYOUT))
                 return LCK_CR;
 
         LASSERTF(0, "Invalid it_op: %d\n", it->it_op);
index 9d95c13..fe7aac9 100644 (file)
@@ -135,6 +135,8 @@ char *ldlm_it2str(int it)
                 return "unlink";
         case IT_GETXATTR:
                 return "getxattr";
+        case IT_LAYOUT:
+                return "layout";
         default:
                 CERROR("Unknown intent %d\n", it);
                 return "UNKNOWN";
@@ -566,7 +568,8 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
                 /* Make sure all the right bits are set in this lock we
                    are going to pass to client */
                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
-                         (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
+                         (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                          MDS_INODELOCK_LAYOUT),
                          "Inappropriate inode lock bits during "
                          "conversion " LPU64 "\n",
                          lock->l_policy_data.l_inodebits.bits);
index b418240..ae3e550 100644 (file)
@@ -431,4 +431,13 @@ static inline void cl_stats_tally(struct cl_device *dev, enum cl_req_type crt,
 {
 }
 
+static inline loff_t i_size_read(struct inode *inode)
+{
+        return inode->i_stbuf.st_size;
+}
+
+static inline void i_size_write(struct inode *inode, loff_t i_sz)
+{
+        inode->i_stbuf.st_size = i_sz;
+}
 #endif
index 20a338b..a17429d 100644 (file)
@@ -1731,7 +1731,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                                 } else {
                                         /* Use old size assignment to avoid
                                          * deadlock bz14138 & bz14326 */
-                                        inode->i_size = body->size;
+                                        i_size_write(inode, body->size);
                                         lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
                                 }
                                 ldlm_lock_decref(&lockh, mode);
@@ -1739,7 +1739,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                 } else {
                         /* Use old size assignment to avoid
                          * deadlock bz14138 & bz14326 */
-                        inode->i_size = body->size;
+                        i_size_write(inode, body->size);
 
                         CDEBUG(D_VFSTRACE, "inode=%lu, updating i_size %llu\n",
                                inode->i_ino, (unsigned long long)body->size);
index d60d359..6cbf9e0 100644 (file)
@@ -410,10 +410,10 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                 unsigned long user_addr = (unsigned long)iov[seg].iov_base;
 
                 if (rw == READ) {
-                        if (file_offset >= inode->i_size)
+                        if (file_offset >= i_size_read(inode))
                                 break;
-                        if (file_offset + iov_left > inode->i_size)
-                                iov_left = inode->i_size - file_offset;
+                        if (file_offset + iov_left > i_size_read(inode))
+                                iov_left = i_size_read(inode) - file_offset;
                 }
 
                 while (iov_left > 0) {
index cd190fb..e4225e6 100644 (file)
@@ -527,7 +527,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-        if (it->it_op & (IT_LOOKUP | IT_GETATTR))
+        if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))
                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
                                        flags, reqp, cb_blocking,
                                        extra_lock_flags);
index 99c9c0c..644fa16 100644 (file)
@@ -672,7 +672,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 lmm = NULL;
         } else if (it->it_op & IT_UNLINK)
                 req = mdc_intent_unlink_pack(exp, it, op_data);
-        else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
+        else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
                 req = mdc_intent_getattr_pack(exp, it, op_data);
         else if (it->it_op == IT_READDIR)
                 req = ldlm_enqueue_pack(exp);
@@ -803,7 +803,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp,
         } else if (it->it_op == IT_OPEN) {
                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
         } else {
-                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
+                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
         }
 
         /* If we already have a matching lock, then cancel the new
@@ -859,8 +859,17 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
         } else {
                 fid_build_reg_res_name(fid, &res_id);
-                policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
-                                  MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
+                switch (it->it_op) {
+                case IT_GETATTR:
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                        break;
+                case IT_LAYOUT:
+                        policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
+                        break;
+                default:
+                        policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+                        break;
+                }
                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                        LDLM_FL_BLOCK_GRANTED, &res_id,
                                        LDLM_IBITS, &policy,
@@ -924,7 +933,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
 
         lockh.cookie = 0;
         if (fid_is_sane(&op_data->op_fid2) &&
-            (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
+            (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
index 021b681..6e4705e 100644 (file)
@@ -213,7 +213,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
         lh->mlh_reg_mode = lm;
         lh->mlh_type = MDT_PDO_LOCK;
 
-        if (name != NULL) {
+        if (name != NULL && (name[0] != '\0')) {
                 LASSERT(namelen > 0);
                 lh->mlh_pdo_hash = full_name_hash(name, namelen);
         } else {
@@ -897,8 +897,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         name = NULL;
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                                "ldlm_rep = %p\n",
-                               PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
-                               ldlm_rep);
+                               PFID(mdt_object_fid(parent)),
+                               PFID(&reqbody->fid2), ldlm_rep);
                 } else {
                         lname = mdt_name(info->mti_env, (char *)name, namelen);
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
@@ -969,13 +969,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
 
         if (lname) {
-                /* step 1: lock parent */
-                lhp = &info->mti_lh[MDT_LH_PARENT];
-                mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
-                rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                     MDT_LOCAL_LOCK);
-                if (unlikely(rc != 0))
-                        RETURN(rc);
+                /* step 1: lock parent only if parent is a directory */
+                if (S_ISDIR(lu_object_attr(&parent->mot_obj.mo_lu))) {
+                        lhp = &info->mti_lh[MDT_LH_PARENT];
+                        mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+                        rc = mdt_object_lock(info, parent, lhp,
+                                             MDS_INODELOCK_UPDATE,
+                                             MDT_LOCAL_LOCK);
+                        if (unlikely(rc != 0))
+                                RETURN(rc);
+                }
 
                 /* step 2: lookup child's fid by name */
                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
@@ -983,7 +986,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
                 if (rc != 0) {
                         if (rc == -ENOENT)
-                                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                                mdt_set_disposition(info, ldlm_rep,
+                                                    DISP_LOOKUP_NEG);
                         GOTO(out_parent, rc);
                 } else
                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
@@ -1031,7 +1035,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 relock:
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, LCK_PR);
+                if (child_bits == MDS_INODELOCK_LAYOUT)
+                        mdt_lock_reg_init(lhc, LCK_CR);
+                else
+                        mdt_lock_reg_init(lhc, LCK_PR);
 
                 if (!(child_bits & MDS_INODELOCK_UPDATE)) {
                         struct md_attr *ma = &info->mti_attr;
@@ -1043,6 +1050,12 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
+                        /* layout lock is used only on regular files */
+                        if ((ma->ma_valid & MA_INODE) &&
+                            (ma->ma_attr.la_valid & LA_MODE) &&
+                            !S_ISREG(ma->ma_attr.la_mode))
+                                child_bits &= ~MDS_INODELOCK_LAYOUT;
+
                         /* If the file has not been changed for some time, we
                          * return not only a LOOKUP lock, but also an UPDATE
                          * lock and this might save us RPC on later STAT. For
@@ -3082,6 +3095,7 @@ enum mdt_it_code {
         MDT_IT_UNLINK,
         MDT_IT_TRUNC,
         MDT_IT_GETXATTR,
+        MDT_IT_LAYOUT,
         MDT_IT_NR
 };
 
@@ -3152,6 +3166,11 @@ static struct mdt_it_flavor {
                 .it_fmt   = NULL,
                 .it_flags = 0,
                 .it_act   = NULL
+        },
+        [MDT_IT_LAYOUT] = {
+                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_getattr
         }
 };
 
@@ -3327,8 +3346,18 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         case MDT_IT_GETATTR:
                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
+        case MDT_IT_LAYOUT: {
+                static int printed = 0;
+
+                if (!printed) {
+                        CERROR("layout lock not supported by this version\n");
+                        printed = 1;
+                }
+                GOTO(out_shrink, rc = -EINVAL);
+                break;
+        }
         default:
-                CERROR("Unhandled till now");
+                CERROR("Unsupported intent (%d)\n", opcode);
                 GOTO(out_shrink, rc = -EINVAL);
         }
 
@@ -3483,6 +3512,9 @@ static int mdt_intent_code(long itcode)
         case IT_GETXATTR:
                 rc = MDT_IT_GETXATTR;
                 break;
+        case IT_LAYOUT:
+                rc = MDT_IT_LAYOUT;
+                break;
         default:
                 CERROR("Unknown intent opcode: %ld\n", itcode);
                 rc = -EINVAL;
index 9da2405..2fb154c 100644 (file)
@@ -85,7 +85,7 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, struct lu_fid *parent,
                 newvalid |= OBD_MD_FLCTIME;
         }
         if (valid & OBD_MD_FLSIZE) {
-                dst->o_size = src->i_size;
+                dst->o_size = i_size_read(src);
                 newvalid |= OBD_MD_FLSIZE;
         }
         if (valid & OBD_MD_FLBLOCKS) {  /* allocation of space (x512 bytes) */
index a31ecf3..f61d613 100644 (file)
@@ -3908,7 +3908,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                         } else {
                                 /* Use these existing objects if they are
                                  * zero length. */
-                                if (dchild->d_inode->i_size == 0) {
+                                if (i_size_read(dchild->d_inode) == 0) {
                                         rc = filter_use_existing_obj(obd,dchild,
                                                       &handle, &cleanup_phase);
                                         if (rc == 0)
index ac6c9d8..0e07afe 100644 (file)
@@ -1792,6 +1792,8 @@ void lustre_assert_wire_constants(void)
                  MDS_INODELOCK_UPDATE);
         LASSERTF(MDS_INODELOCK_OPEN == 0x000004, "found 0x%.8x\n",
                  MDS_INODELOCK_OPEN);
+        LASSERTF(MDS_INODELOCK_LAYOUT == 0x000008, "found 0x%.8x\n",
+                 MDS_INODELOCK_LAYOUT);
 
         /* Checks for struct mdt_ioepoch */
         LASSERTF((int)sizeof(struct mdt_ioepoch) == 24, "found %lld\n",
index f5b331e..fea3106 100644 (file)
@@ -795,6 +795,7 @@ check_mdt_body(void)
         CHECK_DEFINE_X(MDS_INODELOCK_LOOKUP);
         CHECK_DEFINE_X(MDS_INODELOCK_UPDATE);
         CHECK_DEFINE_X(MDS_INODELOCK_OPEN);
+        CHECK_DEFINE_X(MDS_INODELOCK_LAYOUT);
 }
 
 static void
index 7d10667..1de4005 100644 (file)
@@ -1790,6 +1790,8 @@ void lustre_assert_wire_constants(void)
                  MDS_INODELOCK_UPDATE);
         LASSERTF(MDS_INODELOCK_OPEN == 0x000004, "found 0x%.8x\n",
                  MDS_INODELOCK_OPEN);
+        LASSERTF(MDS_INODELOCK_LAYOUT == 0x000008, "found 0x%.8x\n",
+                 MDS_INODELOCK_LAYOUT);
 
         /* Checks for struct mdt_ioepoch */
         LASSERTF((int)sizeof(struct mdt_ioepoch) == 24, "found %lld\n",