return rc;
}
+int mds_lock_mode_for_dir(struct obd_device *obd,
+ struct dentry *dentry, int mode)
+{
+ int ret_mode;
+
+ /* any dir access needs couple locks:
+ * 1) on part of dir we gonna lookup/modify in
+ * 2) on a whole dir to protect it from concurrent splitting
+ * and to flush client's cache for readdir()
+ * so, for a given mode and dentry this routine decides what
+ * lock mode to use for lock #2:
+ * 1) if caller's gonna lookup in dir then we need to protect
+ * dir from being splitted only - LCK_CR
+ * 2) if caller's gonna modify dir then we need to protect
+ * dir from being splitted and to flush cache - LCK_CW
+ * 3) if caller's gonna modify dir and that dir seems ready
+ * for splitting then we need to protect it from any
+ * type of access (lookup/modify/split) - LCK_EX -bzzz */
+
+ if (mode == LCK_PR) {
+ ret_mode = LCK_CR;
+ } else if (mode == LCK_PW) {
+ /* caller gonna modify directory.we use concurrent
+ write lock here to retract client's cache for readdir */
+ ret_mode = LCK_CW;
+ if (mds_splitting_expected(obd, dentry)) {
+ /* splitting possible. serialize any access */
+ CERROR("%s: gonna split %lu/%lu\n",
+ obd->obd_name,
+ (unsigned long) dentry->d_inode->i_ino,
+ (unsigned long) dentry->d_inode->i_generation);
+ ret_mode = LCK_EX;
+ }
+ } else {
+ CWARN("unexpected lock mode %d\n", mode);
+ ret_mode = LCK_EX;
+ }
+ return ret_mode;
+}
+
/* only valid locked dentries or errors should be returned */
struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
struct vfsmount **mnt, int lock_mode,
- struct lustre_handle *lockh,
+ struct lustre_handle *lockh, int *mode,
char *name, int namelen, __u64 lockpart)
{
struct mds_obd *mds = &obd->u.mds;
res_id.name[0] = de->d_inode->i_ino;
res_id.name[1] = de->d_inode->i_generation;
-#ifdef S_PDIROPS
lockh[1].cookie = 0;
+#ifdef S_PDIROPS
if (name && IS_PDIROPS(de->d_inode)) {
ldlm_policy_data_t cpolicy =
{ .l_inodebits = { MDS_INODELOCK_UPDATE } };
- /* lock just dir { ino, generation } to flush client cache */
- if (lock_mode == LCK_PW) {
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- res_id, LDLM_IBITS,
- &cpolicy, LCK_CW, &flags,
- mds_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, lockh + 1);
- if (rc != ELDLM_OK) {
- l_dput(de);
- RETURN(ERR_PTR(-ENOLCK));
- }
- flags = 0;
+ LASSERT(mode != NULL);
+ *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS,
+ &cpolicy, *mode, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, lockh + 1);
+ if (rc != ELDLM_OK) {
+ l_dput(de);
+ RETURN(ERR_PTR(-ENOLCK));
}
+ flags = 0;
res_id.name[2] = full_name_hash(name, namelen);
+
CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
de->d_inode->i_ino, de->d_inode->i_generation,
res_id.name[2]);
struct dentry *dparent = NULL, *dchild = NULL;
struct lvfs_ucred uc;
struct lustre_handle parent_lockh[2];
- int namesize;
+ int namesize, update_mode;
int rc = 0, cleanup_phase = 0, resent_req = 0;
char *name;
ENTRY;
obd->obd_name, (unsigned long) body->fid1.id,
(unsigned long) body->fid1.generation);
dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
- parent_lockh, NULL, 0, child_part);
+ parent_lockh, &update_mode,
+ NULL, 0, child_part);
if (IS_ERR(dchild)) {
CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
GOTO(cleanup, rc = PTR_ERR(dchild));
memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
#ifdef S_PDIROPS
if (parent_lockh[1].cookie)
- ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+ ldlm_lock_decref(parent_lockh + 1, update_mode);
#endif
cleanup_phase = 2;
goto fill_inode;
rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
parent_lockh, &dparent,
LCK_PR, MDS_INODELOCK_LOOKUP,
- name, namesize,
+ &update_mode, name, namesize,
child_lockh, &dchild, LCK_PR,
child_part);
if (rc)
#ifdef S_PDIROPS
if (parent_lockh[1].cookie != 0)
ldlm_lock_decref(parent_lockh + 1,
- LCK_CW);
+ update_mode);
#endif
}
if (dparent)