int (*do_layout_change)(const struct lu_env *env, struct dt_object *dt,
struct md_layout_change *mlc,
struct thandle *th);
+
+ /**
+ * Check whether the file is in PCC-RO state.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt DT object
+ * \param[in] layout data structure to describe the changes to
+ * the DT object's layout
+ *
+ * \retval 0 success
+ * \retval -ne -EALREADY if the file is already PCC-RO cached;
+ * Otherwise, return error code
+ */
+ int (*do_layout_pccro_check)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct md_layout_change *mlc);
};
enum dt_bufs_type {
return o->do_ops->do_layout_change(env, o, mlc, th);
}
+static inline int dt_layout_pccro_check(const struct lu_env *env,
+ struct dt_object *o,
+ struct md_layout_change *mlc)
+{
+ LASSERT(o);
+ LASSERT(o->do_ops);
+ LASSERT(o->do_ops->do_layout_pccro_check);
+ return o->do_ops->do_layout_pccro_check(env, o, mlc);
+}
+
struct dt_find_hint {
struct lu_fid *dfh_fid;
struct dt_device *dfh_dt;
int (*moo_layout_change)(const struct lu_env *env,
struct md_object *obj,
struct md_layout_change *layout);
+ /**
+ * Check whether the file is in PCC-RO state.
+ */
+ int (*moo_layout_pccro_check)(const struct lu_env *env,
+ struct md_object *obj,
+ struct md_layout_change *layout);
};
/**
return m->mo_ops->moo_layout_change(env, m, layout);
}
+static inline int mo_layout_pccro_check(const struct lu_env *env,
+ struct md_object *m,
+ struct md_layout_change *layout)
+{
+ LASSERT(m->mo_ops->moo_layout_pccro_check);
+ return m->mo_ops->moo_layout_pccro_check(env, m, layout);
+}
+
static inline int mo_swap_layouts(const struct lu_env *env,
struct md_object *o1, struct md_object *o2,
__u64 dv1, __u64 dv2, __u64 flags)
#define OBD_FAIL_MDS_LOD_CREATE_PAUSE 0x173
#define OBD_FAIL_MDS_CONNECT_VS_EVICT 0x174
#define OBD_FAIL_MDS_DELAY_OPEN 0x175
+#define OBD_FAIL_MDS_LL_PCCRO 0x176
/* CMD */
#define OBD_FAIL_MDS_IS_SUBDIR_NET 0x180
RETURN(-EOPNOTSUPP);
rc = pcc_attach_allowed_check(inode);
- if (rc)
+ if (rc) {
+ CDEBUG(D_CACHE,
+ "PCC-RO caching for "DFID" not allowed, rc = %d\n",
+ PFID(ll_inode2fid(inode)), rc);
RETURN(rc);
+ }
rc = pcc_layout_rdonly_set(inode, &gen);
if (rc)
pcc_inode_lock(inode);
old_cred = override_creds(super->pccs_cred);
lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
- if (gen != ll_layout_version_get(lli))
+ if (gen != ll_layout_version_get(lli)) {
+ CDEBUG(D_CACHE, "L.Gen mismatch %u:%u\n",
+ gen, ll_layout_version_get(lli));
GOTO(out_put_unlock, rc = -ESTALE);
+ }
pcci = ll_i2pcci(inode);
if (!pcci) {
return need_sync ? 0 : -EALREADY;
}
+static int lod_layout_pccro_check(const struct lu_env *env,
+ struct dt_object *dt,
+ struct md_layout_change *mlc)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc;
+
+ rc = lod_striping_load(env, lo);
+ if (rc)
+ return rc;
+
+ return lo->ldo_flr_state & LCM_FL_PCC_RDONLY ? -EALREADY : 0;
+}
+
static struct lod_layout_component *
lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id)
{
.do_invalidate = lod_invalidate,
.do_declare_layout_change = lod_declare_layout_change,
.do_layout_change = lod_layout_change,
+ .do_layout_pccro_check = lod_layout_pccro_check,
};
/**
return dt_layout_change(env, mdd_object_child(obj), mlc, handle);
}
+static inline int
+mdo_layout_pccro_check(const struct lu_env *env, struct mdd_object *obj,
+ struct md_layout_change *mlc)
+{
+ return dt_layout_pccro_check(env, mdd_object_child(obj), mlc);
+}
+
static inline
int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
const struct lu_fid *fid, __u32 type,
/* Update the layout for PCC-RO. */
static int
+mdd_layout_pccro_check(const struct lu_env *env, struct md_object *o,
+ struct md_layout_change *mlc)
+{
+ return mdo_layout_pccro_check(env, md2mdd_obj(o), mlc);
+}
+
+/**
+ * Update the layout for PCC-RO.
+ */
+static int
mdd_layout_update_pccro(const struct lu_env *env, struct md_object *o,
struct md_layout_change *mlc)
{
.moo_object_lock = mdd_object_lock,
.moo_object_unlock = mdd_object_unlock,
.moo_layout_change = mdd_layout_change,
+ .moo_layout_pccro_check = mdd_layout_pccro_check,
};
rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX);
if (rc)
RETURN(rc);
+
+ CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_PCCRO, cfs_fail_val);
}
mutex_lock(&obj->mot_som_mutex);
return rc;
}
+static int mdt_layout_change_pccro(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ struct md_layout_change *layout)
+{
+ int rc;
+
+ ENTRY;
+
+ if (!mdt_object_exists(obj))
+ RETURN(-ENOENT);
+
+ if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+ RETURN(-EINVAL);
+
+ rc = mdt_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT, LCK_CR);
+ if (rc)
+ RETURN(rc);
+
+ rc = mo_layout_pccro_check(info->mti_env,
+ mdt_object_child(obj), layout);
+ if (rc == -EALREADY)
+ RETURN(0);
+
+ mdt_object_unlock(info, obj, lhc, 1);
+ rc = mdt_layout_change(info, obj, lhc, layout);
+ RETURN(rc);
+}
+
static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
mdt_intent_fixup_resent(info, *lockp, lhc, flags);
(*lockp)->l_lvb_type = LVB_T_LAYOUT;
- /*
- * Instantiate some layout components, if @buf contains lovea, then it's
- * a replay of the layout intent write RPC.
- */
- rc = mdt_layout_change(info, obj, lhc, &layout);
+ if (intent->lai_opc == LAYOUT_INTENT_PCCRO_SET)
+ /*
+ * Take a CR layout lock against the file object first to check
+ * whether the file is already PCC-RO cached. If so, return
+ * immediately; Otherwise, take an EX layout lock on the file
+ * to update the FLR PCC-RO state accordingly. By this check,
+ * it can avoid heavy lock contention and unnecessary revocation
+ * of the layout lock granted to the other clients when multiple
+ * processes from many clients perform read-only attach on a
+ * shared file object simultaneously.
+ */
+ rc = mdt_layout_change_pccro(info, obj, lhc, &layout);
+ else
+ /*
+ * Instantiate some layout components, if @buf contains lovea,
+ * then it's a replay of the layout intent write RPC.
+ */
+ rc = mdt_layout_change(info, obj, lhc, &layout);
+
ldlm_rep->lock_policy_res2 = clear_serious(rc);
if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
stack_trap "do_facet $facet rm -rf $mntpt" EXIT
do_facet $facet dd if=/dev/zero of=$file bs=1M count=$size
stack_trap "do_facet $facet rm -f $file" EXIT
+ do_facet $facet mount
+ do_facet $facet $UMOUNT $mntpt
+ do_facet $facet mount
do_facet $facet mkfs.ext4 $file ||
error "mkfs.ext4 $file failed"
do_facet $facet file $file
}
run_test 36b "Stale RO-PCC copy should be deleted after remove the PCC backend"
+test_37() {
+ local loopfile="$TMP/$tfile"
+ local loopfile2="$TMP/$tfile.2"
+ local mntpt="/mnt/pcc.$tdir"
+ local mntpt2="/mnt/pcc.$tdir.2"
+ local file=$DIR/$tdir/$tfile
+ local file2=$DIR2/$tdir/$tfile
+
+ $LCTL get_param -n mdc.*.connect_flags | grep -q pcc_ro ||
+ skip "Server does not support PCC-RO"
+
+ mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+ touch $file
+
+ setup_loopdev client $loopfile $mntpt 50
+ setup_loopdev client $loopfile2 $mntpt2 50
+ $LCTL pcc add $MOUNT $mntpt -p \
+ "projid={2} roid=$HSM_ARCHIVE_NUMBER auto_attach=0 pccro=1" ||
+ error "failed to config PCC for $MOUNT $mntpt"
+ $LCTL pcc add $MOUNT2 $mntpt2 -p \
+ "projid={2} roid=$HSM_ARCHIVE_NUMBER auto_attach=0 pccro=1" ||
+ error "failed to config PCC for $MOUNT2 $mntpt2"
+ $LCTL pcc list $MOUNT
+ $LCTL pcc list $MOUNT2
+
+ cancel_lru_locks mdc
+#define CFS_FAIL_ONCE | OBD_FAIL_MDS_LL_PCCRO
+ $LCTL set_param -n fail_loc=0x80000176 fail_val=10
+ $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file &
+ sleep 2
+ $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file2
+ wait
+ $LFS pcc state $file
+ $LFS pcc state $file2
+
+ check_lpcc_state $file "readonly" client
+ check_lpcc_state $file2 "readonly" client
+
+ $LCTL pcc clear $MOUNT
+ $LCTL pcc clear $MOUNT2
+}
+run_test 37 "Multiple readers on a shared file with PCC-RO mode"
+
test_41() {
local loopfile="$TMP/$tfile"
local mntpt="/mnt/pcc.$tdir"