Whamcloud - gitweb
LU-11158 mdt: grow lvb buffer to hold layout 47/32847/11
authorBobi Jam <bobijam@whamcloud.com>
Thu, 19 Jul 2018 15:19:43 +0000 (23:19 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 29 Oct 2018 15:58:17 +0000 (15:58 +0000)
Write intent RPC could generate a layout bigger than the initial
mdt_max_mdsize, so that the new layout cannot be returned to client,
this patch fix this issue by:

* fix a glitch in lod_use_defined_striping(), where v3 should be
  updated along v1.
* change lvbo_fill() return -ERANGE in this case, and stores in its
  @buflen parameter the needed buffer size
* in ldlm_handle_enqueue0(), when ldlm_lvbo_fill() detects -ERANGE,
  it grows the corresponding RMF_DLM_LVB buffer and retrives the
  layout to refill the buffer again.
* define a new MAX_MD_SIZE to hold a reasonal composite layout, and
  keeps old MAX_MD_SIZE as MAX_MD_SIZE_OLD.

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: I255b954195b3e64c3edd416c0cb209df0d9fc43a
Reviewed-on: https://review.whamcloud.com/32847
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_lockd.c
lustre/lod/lod_qos.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lvb.c
lustre/ofd/ofd_lvb.c
lustre/quota/qmt_lock.c
lustre/tests/replay-single.sh
lustre/tests/sanity-pfl.sh

index 5262a1d..738154b 100644 (file)
@@ -302,7 +302,7 @@ struct ldlm_valblock_ops {
        int (*lvbo_size)(struct ldlm_lock *lock);
        /* Called to fill in lvb data to RPC buffer @buf */
        int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock,
-                        void *buf, int buflen);
+                        void *buf, int *buflen);
 };
 
 /**
@@ -1122,7 +1122,7 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
 }
 
 static inline int ldlm_lvbo_fill(const struct lu_env *env,
-                                struct ldlm_lock *lock, void *buf, int len)
+                                struct ldlm_lock *lock, void *buf, int *len)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
        int rc;
index 8dafb71..a4ecd35 100644 (file)
@@ -1138,7 +1138,11 @@ struct lov_mds_md_v1 {            /* LOV EA mds/wire data (little-endian) */
        struct lov_ost_data_v1 lmm_objects[0]; /* per-stripe data */
 };
 
-#define MAX_MD_SIZE (sizeof(struct lov_mds_md) + 4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE_OLD (sizeof(struct lov_mds_md) +                   \
+                        4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE (sizeof(struct lov_comp_md_v1) +                   \
+                    4 * (sizeof(struct lov_comp_md_entry_v1) +         \
+                         MAX_MD_SIZE_OLD))
 #define MIN_MD_SIZE (sizeof(struct lov_mds_md) + 1 * sizeof(struct lov_ost_data))
 
 /* This is the default MDT reply size allocated, should the striping be bigger,
index 486f53b..5789e8c 100644 (file)
@@ -1003,7 +1003,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                if (req->rq_svc_thread)
                        env = req->rq_svc_thread->t_env;
 
-               lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len);
+               lvb_len = ldlm_lvbo_fill(env, lock, lvb, &lvb_len);
                if (lvb_len < 0) {
                        /* We still need to send the RPC to wake up the blocked
                         * enqueue thread on the client.
@@ -1409,43 +1409,59 @@ existing_lock:
                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                           "(err=%d, rc=%d)", err, rc);
 
-               if (rc == 0) {
-                       if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
-                                                 RCL_SERVER) &&
-                           ldlm_lvbo_size(lock) > 0) {
-                               void *buf;
-                               int buflen;
-
-                               buf = req_capsule_server_get(&req->rq_pill,
-                                                            &RMF_DLM_LVB);
-                               LASSERTF(buf != NULL, "req %p, lock %p\n",
-                                        req, lock);
-                               buflen = req_capsule_get_size(&req->rq_pill,
-                                               &RMF_DLM_LVB, RCL_SERVER);
-                               /* non-replayed lock, delayed lvb init may
-                                * need to be occur now */
-                               if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
-                                       buflen = ldlm_lvbo_fill(env, lock, buf,
-                                                               buflen);
-                                       if (buflen >= 0)
-                                               req_capsule_shrink(
+               if (rc == 0 &&
+                   req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+                                         RCL_SERVER) &&
+                   ldlm_lvbo_size(lock) > 0) {
+                       void *buf;
+                       int buflen;
+
+retry:
+                       buf = req_capsule_server_get(&req->rq_pill,
+                                                    &RMF_DLM_LVB);
+                       LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
+                       buflen = req_capsule_get_size(&req->rq_pill,
+                                       &RMF_DLM_LVB, RCL_SERVER);
+                       /* non-replayed lock, delayed lvb init may
+                        * need to be occur now
+                        */
+                       if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
+                               int rc2;
+
+                               rc2 = ldlm_lvbo_fill(env, lock, buf, &buflen);
+                               if (rc2 >= 0) {
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          rc2, RCL_SERVER);
+                               } else if (rc2 == -ERANGE) {
+                                       rc2 = req_capsule_server_grow(
                                                        &req->rq_pill,
-                                                       &RMF_DLM_LVB,
-                                                       buflen, RCL_SERVER);
-                                       else
-                                               rc = buflen;
-                               } else if (flags & LDLM_FL_REPLAY) {
-                                       /* no LVB resend upon replay */
-                                       if (buflen > 0)
+                                                       &RMF_DLM_LVB, buflen);
+                                       if (!rc2) {
+                                               goto retry;
+                                       } else {
+                                               /* if we can't grow the buffer,
+                                                * it's ok to return empty lvb
+                                                * to client.
+                                                */
                                                req_capsule_shrink(
                                                        &req->rq_pill,
-                                                       &RMF_DLM_LVB,
-                                                       0, RCL_SERVER);
-                                       else
-                                               rc = buflen;
+                                                       &RMF_DLM_LVB, 0,
+                                                       RCL_SERVER);
+                                       }
                                } else {
-                                       rc = buflen;
+                                       rc = rc2;
                                }
+                       } else if (flags & LDLM_FL_REPLAY) {
+                               /* no LVB resend upon replay */
+                               if (buflen > 0)
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          0, RCL_SERVER);
+                               else
+                                       rc = buflen;
+                       } else {
+                               rc = buflen;
                        }
                }
 
index d425460..f840150 100644 (file)
@@ -1856,6 +1856,7 @@ int lod_use_defined_striping(const struct lu_env *env,
                if (mo->ldo_is_composite) {
                        offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
                        v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs);
+                       v3 = (struct lov_mds_md_v3 *)v1;
                        magic = le32_to_cpu(v1->lmm_magic);
 
                        ext = &comp_v1->lcm_entries[i].lcme_extent;
index e4abed2..dc42f57 100644 (file)
@@ -619,14 +619,14 @@ static int mdc_finish_enqueue(struct obd_export *exp,
          * It's important that we do this first!  Otherwise we might exit the
          * function without doing so, and try to replay a failed create
          * (bug 3440) */
-        if (it->it_op & IT_OPEN && req->rq_replay &&
+       if (it->it_op & IT_OPEN && req->rq_replay &&
            (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
                mdc_clear_replay_flag(req, it->it_status);
 
-       DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
+       DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
                  it->it_op, it->it_disposition, it->it_status);
 
-        /* We know what to expect, so we do any byte flipping required here */
+       /* We know what to expect, so we do any byte flipping required here */
        if (it_has_reply_body(it)) {
                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
                 if (body == NULL) {
@@ -687,6 +687,8 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                /* maybe the lock was granted right away and layout
                 * is packed into RMF_DLM_LVB of req */
                lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+               CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
+                      class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
                if (lvb_len > 0) {
                        lvb_data = req_capsule_server_sized_get(pill,
                                                        &RMF_DLM_LVB, lvb_len);
index 5bf3306..d94fbbc 100644 (file)
@@ -3793,6 +3793,8 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
                        if (layout_size > info->mti_mdt->mdt_max_mdsize)
                                info->mti_mdt->mdt_max_mdsize = layout_size;
                }
+               CDEBUG(D_INFO, "%s: layout_size %d\n",
+                      mdt_obd_name(info->mti_mdt), layout_size);
        }
 
        /*
@@ -3864,7 +3866,7 @@ out_obj:
 out:
        lhc->mlh_reg_lh.cookie = 0;
 
-       return rc;
+       RETURN(rc);
 }
 
 static int mdt_intent_open(enum ldlm_intent_flags it_opc,
@@ -5032,7 +5034,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        obd = class_name2obd(dev);
        LASSERT(obd != NULL);
 
-       m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
+       m->mdt_max_mdsize = MAX_MD_SIZE_OLD;
        m->mdt_opts.mo_evict_tgt_nids = 1;
        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
 
index 9ee1989..90168e3 100644 (file)
@@ -306,8 +306,23 @@ static int mdt_lvbo_size(struct ldlm_lock *lock)
        return 0;
 }
 
+/**
+ * Implementation of ldlm_valblock_ops::lvbo_fill for MDT.
+ *
+ * This function is called to fill the given RPC buffer \a buf with LVB data
+ *
+ * \param[in] env              execution environment
+ * \param[in] lock             LDLM lock
+ * \param[in] buf              RPC buffer to fill
+ * \param[in,out] lvblen       lvb buffer length
+ *
+ * \retval             size of LVB data written into \a buf buffer
+ *                     or -ERANGE when the provided @lvblen is not big enough,
+ *                     and the needed lvb buffer size will be returned in
+ *                     @lvblen
+ */
 static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
-                        void *lvb, int lvblen)
+                        void *lvb, int *lvblen)
 {
        struct mdt_thread_info *info;
        struct mdt_device *mdt;
@@ -324,7 +339,7 @@ static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
 
                /* call lvbo fill function of quota master */
                rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
-                                            lvblen);
+                                            *lvblen);
                RETURN(rc);
        }
 
@@ -355,8 +370,8 @@ static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
                        mdt_dom_lvbo_update(env, lock->l_resource,
                                            lock, NULL, 0);
 
-               if (lvb_len > lvblen)
-                       lvb_len = lvblen;
+               if (lvb_len > *lvblen)
+                       lvb_len = *lvblen;
 
                lock_res(res);
                memcpy(lvb, res->lr_lvb_data, lvb_len);
@@ -388,7 +403,7 @@ static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
                GOTO(out_put, rc);
        if (rc > 0) {
                struct lu_buf *lmm = NULL;
-               if (lvblen < rc) {
+               if (*lvblen < rc) {
                        int level;
 
                        /* The layout EA may be larger than mdt_max_mdsize
@@ -403,8 +418,9 @@ static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
                        }
                        CDEBUG_LIMIT(level, "%s: small buffer size %d for EA "
                                     "%d (max_mdsize %d): rc = %d\n",
-                                    mdt_obd_name(mdt), lvblen, rc,
+                                    mdt_obd_name(mdt), *lvblen, rc,
                                     info->mti_mdt->mdt_max_mdsize, -ERANGE);
+                       *lvblen = rc;
                        GOTO(out_put, rc = -ERANGE);
                }
                lmm = &info->mti_buf;
@@ -419,7 +435,10 @@ out_put:
        if (obj != NULL && !IS_ERR(obj))
                mdt_object_put(env, obj);
 out:
-       RETURN(rc < 0 ? 0 : rc);
+       if (rc < 0 && rc != -ERANGE)
+               rc = 0;
+
+       RETURN(rc);
 }
 
 static int mdt_lvbo_free(struct ldlm_resource *res)
index aecd1b6..a48b4d5 100644 (file)
@@ -345,6 +345,7 @@ static int ofd_lvbo_size(struct ldlm_lock *lock)
  *
  * This function is called to fill the given RPC buffer \a buf with LVB data
  *
+ * \param[in] env      execution environment
  * \param[in] lock     LDLM lock
  * \param[in] buf      RPC buffer to fill
  * \param[in] buflen   buffer length
@@ -352,7 +353,7 @@ static int ofd_lvbo_size(struct ldlm_lock *lock)
  * \retval             size of LVB data written into \a buf buffer
  */
 static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
-                        void *buf, int buflen)
+                        void *buf, int *buflen)
 {
        struct ldlm_resource *res = lock->l_resource;
        int lvb_len;
@@ -364,8 +365,8 @@ static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
        lvb_len = ofd_lvbo_size(lock);
        LASSERT(lvb_len <= res->lr_lvb_len);
 
-       if (lvb_len > buflen)
-               lvb_len = buflen;
+       if (lvb_len > *buflen)
+               lvb_len = *buflen;
 
        lock_res(res);
        memcpy(buf, res->lr_lvb_data, lvb_len);
index f29db18..9f68426 100644 (file)
@@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        /* on success, pack lvb in reply */
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
-       lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, lvb_len);
+       lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, &lvb_len);
        if (lvb_len < 0)
                GOTO(out, rc = lvb_len);
 
index d261265..32838a9 100755 (executable)
@@ -4758,6 +4758,33 @@ test_131b() {
 }
 run_test 131b "DoM file write replay"
 
+test_132a() {
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.9.90) ] &&
+               skip "Do not support PFL files before 2.10"
+
+       $LFS setstripe -E 1M -c 1 -E EOF -c 2 $DIR/$tfile
+       replay_barrier $SINGLEMDS
+       # write over the first component size cause next component instantiation
+       dd if=/dev/urandom of=$DIR/$tfile bs=1M count=1 seek=1 ||
+               error "dd to $DIR/$tfile failed"
+       lfs getstripe $DIR/$tfile
+
+       cksum=$(md5sum $DIR/$tfile | awk '{print $1}')
+       $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+               error "Component #1 was not instantiated"
+
+       fail $SINGLEMDS
+
+       lfs getstripe $DIR/$tfile
+       $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+               error "Component #1 instantiation was not replayed"
+       cksum2=$(md5sum $DIR/$tfile | awk '{print $1}')
+       if [ $cksum != $cksum2 ] ; then
+               error_noexit "New cksum $cksum2 does not match original $cksum"
+       fi
+}
+run_test 132a "PFL new component instantiate replay"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index e27c39e..f78bc3f 100644 (file)
@@ -27,11 +27,6 @@ if [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.9.51) ]]; then
        skip_env "Need MDS version at least 2.9.51"
 fi
 
-if [ $MDSCOUNT -eq 1 ]; then
-       # Bug number:    LU-10686
-       ALWAYS_EXCEPT+=" 9"
-fi
-
 [ "$ALWAYS_EXCEPT$EXCEPT" ] &&
        echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT"
 
@@ -350,7 +345,7 @@ test_9() {
        test_mkdir $DIR/$tdir
        rm -f $comp_file
 
-       $LFS setstripe -E 1M -S 1M -E 2M -c 1 $comp_file ||
+       $LFS setstripe -E 1M -S 1M -E -1 -c 1 $comp_file ||
                error "Create $comp_file failed"
 
        local comp_cnt=$($LFS getstripe --component-count $comp_file)