Whamcloud - gitweb
LU-11158 mdt: grow lvb buffer to hold layout 49/34049/4
authorBobi Jam <bobijam@whamcloud.com>
Thu, 19 Jul 2018 15:19:43 +0000 (23:19 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 15 Feb 2019 01:28:35 +0000 (01:28 +0000)
Write intent RPC could generate a layout bigger than the initial
mdt_max_mdsize, so that the new layout cannot be returned to client,
this patch fix this issue by:

* fix a glitch in lod_use_defined_striping(), where v3 should be
  updated along v1.
* change lvbo_fill() return -ERANGE in this case, and stores in its
  @buflen parameter the needed buffer size
* in ldlm_handle_enqueue0(), when ldlm_lvbo_fill() detects -ERANGE,
  it grows the corresponding RMF_DLM_LVB buffer and retrives the
  layout to refill the buffer again.
* define a new MAX_MD_SIZE to hold a reasonal composite layout, and
  keeps old MAX_MD_SIZE as MAX_MD_SIZE_OLD.

lustre-review: https://review.whamcloud.com/32847
lustre-commit: e5abcf83c0575b8a79594c1eb9ea727739d91522

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: I255b954195b3e64c3edd416c0cb209df0d9fc43a
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/34049
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/lod/lod_qos.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lvb.c
lustre/ofd/ofd_lvb.c
lustre/quota/qmt_lock.c
lustre/tests/replay-single.sh
lustre/tests/sanity-pfl.sh

index 7f789c5..ba3ea45 100644 (file)
@@ -1062,7 +1062,11 @@ struct lov_mds_md_v1 {            /* LOV EA mds/wire data (little-endian) */
        struct lov_ost_data_v1 lmm_objects[0]; /* per-stripe data */
 };
 
-#define MAX_MD_SIZE (sizeof(struct lov_mds_md) + 4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE_OLD (sizeof(struct lov_mds_md) +                   \
+                        4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE (sizeof(struct lov_comp_md_v1) +                   \
+                    4 * (sizeof(struct lov_comp_md_entry_v1) +         \
+                         MAX_MD_SIZE_OLD))
 #define MIN_MD_SIZE (sizeof(struct lov_mds_md) + 1 * sizeof(struct lov_ost_data))
 
 /* This is the default MDT reply size allocated, should the striping be bigger,
index 2dcc2ae..7d84ff0 100644 (file)
@@ -297,7 +297,7 @@ struct ldlm_valblock_ops {
        /* Return size of lvb data appropriate RPC size can be reserved */
        int (*lvbo_size)(struct ldlm_lock *lock);
        /* Called to fill in lvb data to RPC buffer @buf */
-       int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen);
+       int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int *buflen);
 };
 
 /**
@@ -1084,7 +1084,7 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
        return 0;
 }
 
-static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
+static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int *len)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
        int rc;
index 587122e..260b9a0 100644 (file)
@@ -1012,7 +1012,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        if (lvb_len > 0) {
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
-               lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+               lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len);
                if (lvb_len < 0) {
                        /* We still need to send the RPC to wake up the blocked
                         * enqueue thread on the client.
@@ -1460,43 +1460,59 @@ existing_lock:
                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                           "(err=%d, rc=%d)", err, rc);
 
-               if (rc == 0) {
-                       if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
-                                                 RCL_SERVER) &&
-                           ldlm_lvbo_size(lock) > 0) {
-                               void *buf;
-                               int buflen;
-
-                               buf = req_capsule_server_get(&req->rq_pill,
-                                                            &RMF_DLM_LVB);
-                               LASSERTF(buf != NULL, "req %p, lock %p\n",
-                                        req, lock);
-                               buflen = req_capsule_get_size(&req->rq_pill,
-                                               &RMF_DLM_LVB, RCL_SERVER);
-                               /* non-replayed lock, delayed lvb init may
-                                * need to be occur now */
-                               if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
-                                       buflen = ldlm_lvbo_fill(lock, buf,
-                                                               buflen);
-                                       if (buflen >= 0)
-                                               req_capsule_shrink(
+               if (rc == 0 &&
+                   req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+                                         RCL_SERVER) &&
+                   ldlm_lvbo_size(lock) > 0) {
+                       void *buf;
+                       int buflen;
+
+retry:
+                       buf = req_capsule_server_get(&req->rq_pill,
+                                                    &RMF_DLM_LVB);
+                       LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
+                       buflen = req_capsule_get_size(&req->rq_pill,
+                                       &RMF_DLM_LVB, RCL_SERVER);
+                       /* non-replayed lock, delayed lvb init may
+                        * need to be occur now
+                        */
+                       if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
+                               int rc2;
+
+                               rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
+                               if (rc2 >= 0) {
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          rc2, RCL_SERVER);
+                               } else if (rc2 == -ERANGE) {
+                                       rc2 = req_capsule_server_grow(
                                                        &req->rq_pill,
-                                                       &RMF_DLM_LVB,
-                                                       buflen, RCL_SERVER);
-                                       else
-                                               rc = buflen;
-                               } else if (flags & LDLM_FL_REPLAY) {
-                                       /* no LVB resend upon replay */
-                                       if (buflen > 0)
+                                                       &RMF_DLM_LVB, buflen);
+                                       if (!rc2) {
+                                               goto retry;
+                                       } else {
+                                               /* if we can't grow the buffer,
+                                                * it's ok to return empty lvb
+                                                * to client.
+                                                */
                                                req_capsule_shrink(
                                                        &req->rq_pill,
-                                                       &RMF_DLM_LVB,
-                                                       0, RCL_SERVER);
-                                       else
-                                               rc = buflen;
+                                                       &RMF_DLM_LVB, 0,
+                                                       RCL_SERVER);
+                                       }
                                } else {
-                                       rc = buflen;
+                                       rc = rc2;
                                }
+                       } else if (flags & LDLM_FL_REPLAY) {
+                               /* no LVB resend upon replay */
+                               if (buflen > 0)
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          0, RCL_SERVER);
+                               else
+                                       rc = buflen;
+                       } else {
+                               rc = buflen;
                        }
                }
 
index 23865c0..786f87f 100644 (file)
@@ -1764,6 +1764,7 @@ int lod_use_defined_striping(const struct lu_env *env,
                if (mo->ldo_is_composite) {
                        offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
                        v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs);
+                       v3 = (struct lov_mds_md_v3 *)v1;
                        magic = le32_to_cpu(v1->lmm_magic);
 
                        ext = &comp_v1->lcm_entries[i].lcme_extent;
index 4a532f0..cb809c2 100644 (file)
@@ -613,14 +613,14 @@ static int mdc_finish_enqueue(struct obd_export *exp,
          * It's important that we do this first!  Otherwise we might exit the
          * function without doing so, and try to replay a failed create
          * (bug 3440) */
-        if (it->it_op & IT_OPEN && req->rq_replay &&
+       if (it->it_op & IT_OPEN && req->rq_replay &&
            (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
                mdc_clear_replay_flag(req, it->it_status);
 
-       DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
+       DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
                  it->it_op, it->it_disposition, it->it_status);
 
-        /* We know what to expect, so we do any byte flipping required here */
+       /* We know what to expect, so we do any byte flipping required here */
        if (it_has_reply_body(it)) {
                 struct mdt_body *body;
 
@@ -683,6 +683,8 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                /* maybe the lock was granted right away and layout
                 * is packed into RMF_DLM_LVB of req */
                lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+               CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
+                      class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
                if (lvb_len > 0) {
                        lvb_data = req_capsule_server_sized_get(pill,
                                                        &RMF_DLM_LVB, lvb_len);
index 24e821e..3173d76 100644 (file)
@@ -3625,6 +3625,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                        if (layout_size > info->mti_mdt->mdt_max_mdsize)
                                info->mti_mdt->mdt_max_mdsize = layout_size;
                }
+               CDEBUG(D_INFO, "%s: layout_size %d\n",
+                      mdt_obd_name(info->mti_mdt), layout_size);
        }
 
        /*
@@ -3696,7 +3698,7 @@ out_obj:
 out:
        lhc->mlh_reg_lh.cookie = 0;
 
-       return rc;
+       RETURN(rc);
 }
 
 static int mdt_intent_reint(enum mdt_it_code opcode,
@@ -4841,7 +4843,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
-        m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
+       m->mdt_max_mdsize = MAX_MD_SIZE_OLD;
        m->mdt_opts.mo_evict_tgt_nids = 1;
         m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
 
index 83918f7..e294fa9 100644 (file)
@@ -92,7 +92,21 @@ static int mdt_lvbo_size(struct ldlm_lock *lock)
        return 0;
 }
 
-static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+/**
+ * Implementation of ldlm_valblock_ops::lvbo_fill for MDT.
+ *
+ * This function is called to fill the given RPC buffer \a buf with LVB data
+ *
+ * \param[in] lock             LDLM lock
+ * \param[in] buf              RPC buffer to fill
+ * \param[in,out] lvblen       lvb buffer length
+ *
+ * \retval             size of LVB data written into \a buf buffer
+ *                     or -ERANGE when the provided @lvblen is not big enough,
+ *                     and the needed lvb buffer size will be returned in
+ *                     @lvblen
+ */
+static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int *lvblen)
 {
        struct lu_env env;
        struct mdt_thread_info *info;
@@ -110,7 +124,7 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
 
                /* call lvbo fill function of quota master */
                rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
-                                            lvblen);
+                                            *lvblen);
                RETURN(rc);
        }
 
@@ -157,8 +171,7 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
 
        if (rc > 0) {
                struct lu_buf *lmm = NULL;
-
-               if (lvblen < rc) {
+               if (*lvblen < rc) {
                        int level;
 
                        /* The layout EA may be larger than mdt_max_mdsize
@@ -173,8 +186,9 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                        }
                        CDEBUG_LIMIT(level, "%s: small buffer size %d for EA "
                                     "%d (max_mdsize %d): rc = %d\n",
-                                    mdt_obd_name(mdt), lvblen, rc,
+                                    mdt_obd_name(mdt), *lvblen, rc,
                                     info->mti_mdt->mdt_max_mdsize, -ERANGE);
+                       *lvblen = rc;
                        GOTO(out, rc = -ERANGE);
                }
 
@@ -191,7 +205,11 @@ out:
        if (obj != NULL && !IS_ERR(obj))
                mdt_object_put(&env, obj);
        lu_env_fini(&env);
-       RETURN(rc < 0 ? 0 : rc);
+
+       if (rc < 0 && rc != -ERANGE)
+               rc = 0;
+
+       RETURN(rc);
 }
 
 static int mdt_lvbo_free(struct ldlm_resource *res)
index b1d50eb..774ae73 100644 (file)
@@ -358,7 +358,7 @@ static int ofd_lvbo_size(struct ldlm_lock *lock)
  *
  * \retval             size of LVB data written into \a buf buffer
  */
-static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
+static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen)
 {
        struct ldlm_resource *res = lock->l_resource;
        int lvb_len;
@@ -370,8 +370,8 @@ static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
        lvb_len = ofd_lvbo_size(lock);
        LASSERT(lvb_len <= res->lr_lvb_len);
 
-       if (lvb_len > buflen)
-               lvb_len = buflen;
+       if (lvb_len > *buflen)
+               lvb_len = *buflen;
 
        lock_res(res);
        memcpy(buf, res->lr_lvb_data, lvb_len);
index 50fa1b3..e6008bc 100644 (file)
@@ -140,7 +140,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        /* on success, pack lvb in reply */
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
-       lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+       lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
        if (lvb_len < 0)
                GOTO(out, rc = lvb_len);
 
index f93dc81..ef10db5 100755 (executable)
@@ -4623,6 +4623,33 @@ test_120() {
 }
 run_test 120 "DNE fail abort should stop both normal and DNE replay"
 
+test_132a() {
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.9.90) ] &&
+               skip "Do not support PFL files before 2.10"
+
+       $LFS setstripe -E 1M -c 1 -E EOF -c 2 $DIR/$tfile
+       replay_barrier $SINGLEMDS
+       # write over the first component size cause next component instantiation
+       dd if=/dev/urandom of=$DIR/$tfile bs=1M count=1 seek=1 ||
+               error "dd to $DIR/$tfile failed"
+       lfs getstripe $DIR/$tfile
+
+       cksum=$(md5sum $DIR/$tfile | awk '{print $1}')
+       $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+               error "Component #1 was not instantiated"
+
+       fail $SINGLEMDS
+
+       lfs getstripe $DIR/$tfile
+       $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+               error "Component #1 instantiation was not replayed"
+       cksum2=$(md5sum $DIR/$tfile | awk '{print $1}')
+       if [ $cksum != $cksum2 ] ; then
+               error_noexit "New cksum $cksum2 does not match original $cksum"
+       fi
+}
+run_test 132a "PFL new component instantiate replay"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 0b77357..13bd30f 100644 (file)
@@ -342,7 +342,7 @@ test_9() {
        test_mkdir $DIR/$tdir
        rm -f $comp_file
 
-       $LFS setstripe -E 1m -S 1m -E 2M -c 1 $comp_file ||
+       $LFS setstripe -E 1M -S 1M -E -1 -c 1 $comp_file ||
                error "Create $comp_file failed"
 
        local comp_cnt=$($LFS getstripe --component-count $comp_file)