int (*lvbo_size)(struct ldlm_lock *lock);
/* Called to fill in lvb data to RPC buffer @buf */
int (*lvbo_fill)(const struct lu_env *env, struct ldlm_lock *lock,
- void *buf, int buflen);
+ void *buf, int *buflen);
};
/**
}
static inline int ldlm_lvbo_fill(const struct lu_env *env,
- struct ldlm_lock *lock, void *buf, int len)
+ struct ldlm_lock *lock, void *buf, int *len)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
int rc;
struct lov_ost_data_v1 lmm_objects[0]; /* per-stripe data */
};
-#define MAX_MD_SIZE (sizeof(struct lov_mds_md) + 4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE_OLD (sizeof(struct lov_mds_md) + \
+ 4 * sizeof(struct lov_ost_data))
+#define MAX_MD_SIZE (sizeof(struct lov_comp_md_v1) + \
+ 4 * (sizeof(struct lov_comp_md_entry_v1) + \
+ MAX_MD_SIZE_OLD))
#define MIN_MD_SIZE (sizeof(struct lov_mds_md) + 1 * sizeof(struct lov_ost_data))
/* This is the default MDT reply size allocated, should the striping be bigger,
if (req->rq_svc_thread)
env = req->rq_svc_thread->t_env;
- lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len);
+ lvb_len = ldlm_lvbo_fill(env, lock, lvb, &lvb_len);
if (lvb_len < 0) {
/* We still need to send the RPC to wake up the blocked
* enqueue thread on the client.
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- if (rc == 0) {
- if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
- RCL_SERVER) &&
- ldlm_lvbo_size(lock) > 0) {
- void *buf;
- int buflen;
-
- buf = req_capsule_server_get(&req->rq_pill,
- &RMF_DLM_LVB);
- LASSERTF(buf != NULL, "req %p, lock %p\n",
- req, lock);
- buflen = req_capsule_get_size(&req->rq_pill,
- &RMF_DLM_LVB, RCL_SERVER);
- /* non-replayed lock, delayed lvb init may
- * need to be occur now */
- if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
- buflen = ldlm_lvbo_fill(env, lock, buf,
- buflen);
- if (buflen >= 0)
- req_capsule_shrink(
+ if (rc == 0 &&
+ req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER) &&
+ ldlm_lvbo_size(lock) > 0) {
+ void *buf;
+ int buflen;
+
+retry:
+ buf = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
+ LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
+ buflen = req_capsule_get_size(&req->rq_pill,
+ &RMF_DLM_LVB, RCL_SERVER);
+ /* non-replayed lock, delayed lvb init may
+ * need to be occur now
+ */
+ if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
+ int rc2;
+
+ rc2 = ldlm_lvbo_fill(env, lock, buf, &buflen);
+ if (rc2 >= 0) {
+ req_capsule_shrink(&req->rq_pill,
+ &RMF_DLM_LVB,
+ rc2, RCL_SERVER);
+ } else if (rc2 == -ERANGE) {
+ rc2 = req_capsule_server_grow(
&req->rq_pill,
- &RMF_DLM_LVB,
- buflen, RCL_SERVER);
- else
- rc = buflen;
- } else if (flags & LDLM_FL_REPLAY) {
- /* no LVB resend upon replay */
- if (buflen > 0)
+ &RMF_DLM_LVB, buflen);
+ if (!rc2) {
+ goto retry;
+ } else {
+ /* if we can't grow the buffer,
+ * it's ok to return empty lvb
+ * to client.
+ */
req_capsule_shrink(
&req->rq_pill,
- &RMF_DLM_LVB,
- 0, RCL_SERVER);
- else
- rc = buflen;
+ &RMF_DLM_LVB, 0,
+ RCL_SERVER);
+ }
} else {
- rc = buflen;
+ rc = rc2;
}
+ } else if (flags & LDLM_FL_REPLAY) {
+ /* no LVB resend upon replay */
+ if (buflen > 0)
+ req_capsule_shrink(&req->rq_pill,
+ &RMF_DLM_LVB,
+ 0, RCL_SERVER);
+ else
+ rc = buflen;
+ } else {
+ rc = buflen;
}
}
if (mo->ldo_is_composite) {
offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs);
+ v3 = (struct lov_mds_md_v3 *)v1;
magic = le32_to_cpu(v1->lmm_magic);
ext = &comp_v1->lcm_entries[i].lcme_extent;
* It's important that we do this first! Otherwise we might exit the
* function without doing so, and try to replay a failed create
* (bug 3440) */
- if (it->it_op & IT_OPEN && req->rq_replay &&
+ if (it->it_op & IT_OPEN && req->rq_replay &&
(!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
mdc_clear_replay_flag(req, it->it_status);
- DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
+ DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
it->it_op, it->it_disposition, it->it_status);
- /* We know what to expect, so we do any byte flipping required here */
+ /* We know what to expect, so we do any byte flipping required here */
if (it_has_reply_body(it)) {
body = req_capsule_server_get(pill, &RMF_MDT_BODY);
if (body == NULL) {
/* maybe the lock was granted right away and layout
* is packed into RMF_DLM_LVB of req */
lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+ CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
+ class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
if (lvb_len > 0) {
lvb_data = req_capsule_server_sized_get(pill,
&RMF_DLM_LVB, lvb_len);
if (layout_size > info->mti_mdt->mdt_max_mdsize)
info->mti_mdt->mdt_max_mdsize = layout_size;
}
+ CDEBUG(D_INFO, "%s: layout_size %d\n",
+ mdt_obd_name(info->mti_mdt), layout_size);
}
/*
out:
lhc->mlh_reg_lh.cookie = 0;
- return rc;
+ RETURN(rc);
}
static int mdt_intent_open(enum ldlm_intent_flags it_opc,
obd = class_name2obd(dev);
LASSERT(obd != NULL);
- m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
+ m->mdt_max_mdsize = MAX_MD_SIZE_OLD;
m->mdt_opts.mo_evict_tgt_nids = 1;
m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
return 0;
}
+/**
+ * Implementation of ldlm_valblock_ops::lvbo_fill for MDT.
+ *
+ * This function is called to fill the given RPC buffer \a buf with LVB data
+ *
+ * \param[in] env execution environment
+ * \param[in] lock LDLM lock
+ * \param[in] buf RPC buffer to fill
+ * \param[in,out] lvblen lvb buffer length
+ *
+ * \retval size of LVB data written into \a buf buffer
+ * or -ERANGE when the provided @lvblen is not big enough,
+ * and the needed lvb buffer size will be returned in
+ * @lvblen
+ */
static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
- void *lvb, int lvblen)
+ void *lvb, int *lvblen)
{
struct mdt_thread_info *info;
struct mdt_device *mdt;
/* call lvbo fill function of quota master */
rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
- lvblen);
+ *lvblen);
RETURN(rc);
}
mdt_dom_lvbo_update(env, lock->l_resource,
lock, NULL, 0);
- if (lvb_len > lvblen)
- lvb_len = lvblen;
+ if (lvb_len > *lvblen)
+ lvb_len = *lvblen;
lock_res(res);
memcpy(lvb, res->lr_lvb_data, lvb_len);
GOTO(out_put, rc);
if (rc > 0) {
struct lu_buf *lmm = NULL;
- if (lvblen < rc) {
+ if (*lvblen < rc) {
int level;
/* The layout EA may be larger than mdt_max_mdsize
}
CDEBUG_LIMIT(level, "%s: small buffer size %d for EA "
"%d (max_mdsize %d): rc = %d\n",
- mdt_obd_name(mdt), lvblen, rc,
+ mdt_obd_name(mdt), *lvblen, rc,
info->mti_mdt->mdt_max_mdsize, -ERANGE);
+ *lvblen = rc;
GOTO(out_put, rc = -ERANGE);
}
lmm = &info->mti_buf;
if (obj != NULL && !IS_ERR(obj))
mdt_object_put(env, obj);
out:
- RETURN(rc < 0 ? 0 : rc);
+ if (rc < 0 && rc != -ERANGE)
+ rc = 0;
+
+ RETURN(rc);
}
static int mdt_lvbo_free(struct ldlm_resource *res)
*
* This function is called to fill the given RPC buffer \a buf with LVB data
*
+ * \param[in] env execution environment
* \param[in] lock LDLM lock
* \param[in] buf RPC buffer to fill
* \param[in] buflen buffer length
* \retval size of LVB data written into \a buf buffer
*/
static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
- void *buf, int buflen)
+ void *buf, int *buflen)
{
struct ldlm_resource *res = lock->l_resource;
int lvb_len;
lvb_len = ofd_lvbo_size(lock);
LASSERT(lvb_len <= res->lr_lvb_len);
- if (lvb_len > buflen)
- lvb_len = buflen;
+ if (lvb_len > *buflen)
+ lvb_len = *buflen;
lock_res(res);
memcpy(buf, res->lr_lvb_data, lvb_len);
/* on success, pack lvb in reply */
lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
lvb_len = ldlm_lvbo_size(*lockp);
- lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, lvb_len);
+ lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, &lvb_len);
if (lvb_len < 0)
GOTO(out, rc = lvb_len);
}
run_test 131b "DoM file write replay"
+test_132a() {
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.9.90) ] &&
+ skip "Do not support PFL files before 2.10"
+
+ $LFS setstripe -E 1M -c 1 -E EOF -c 2 $DIR/$tfile
+ replay_barrier $SINGLEMDS
+ # write over the first component size cause next component instantiation
+ dd if=/dev/urandom of=$DIR/$tfile bs=1M count=1 seek=1 ||
+ error "dd to $DIR/$tfile failed"
+ lfs getstripe $DIR/$tfile
+
+ cksum=$(md5sum $DIR/$tfile | awk '{print $1}')
+ $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+ error "Component #1 was not instantiated"
+
+ fail $SINGLEMDS
+
+ lfs getstripe $DIR/$tfile
+ $LFS getstripe -I2 $DIR/$tfile | grep -q lmm_objects ||
+ error "Component #1 instantiation was not replayed"
+ cksum2=$(md5sum $DIR/$tfile | awk '{print $1}')
+ if [ $cksum != $cksum2 ] ; then
+ error_noexit "New cksum $cksum2 does not match original $cksum"
+ fi
+}
+run_test 132a "PFL new component instantiate replay"
+
complete $SECONDS
check_and_cleanup_lustre
exit_status
skip_env "Need MDS version at least 2.9.51"
fi
-if [ $MDSCOUNT -eq 1 ]; then
- # Bug number: LU-10686
- ALWAYS_EXCEPT+=" 9"
-fi
-
[ "$ALWAYS_EXCEPT$EXCEPT" ] &&
echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT"
test_mkdir $DIR/$tdir
rm -f $comp_file
- $LFS setstripe -E 1M -S 1M -E 2M -c 1 $comp_file ||
+ $LFS setstripe -E 1M -S 1M -E -1 -c 1 $comp_file ||
error "Create $comp_file failed"
local comp_cnt=$($LFS getstripe --component-count $comp_file)