split = mds_splitting_expected(obd, dentry);
- /* it is important to check here only for MDS_NO_SPLITTABLE.
- * The reason is that MDS_NO_SPLITTABLE means dir is not splittable
- * in principle and another thread will not split it on the quiet.
- * But if we have MDS_NO_SPLIT_EXPECTED, this means, that dir may be
- * splitted anytime, but not now (forcurrent thread) and we should
- * consider that it can happen soon and go that branch which can yield
- * LCK_EX to protect from possible splitting. */
+ /*
+ * it is important to check here only for MDS_NO_SPLITTABLE. The reason
+ * is that MDS_NO_SPLITTABLE means dir is not splittable in principle
+ * and another thread will not split it on the quiet. But if we have
+ * MDS_NO_SPLIT_EXPECTED, this means, that dir may be splitted anytime,
+ * but not now (for current thread) and we should consider that it can
+ * happen soon and go that branch which can yield LCK_EX to protect from
+ * possible splitting.
+ */
if (split == MDS_NO_SPLITTABLE) {
- /* this inode won't be splitted. so we need not to protect from
- * just flush client's cache on modification */
+ /*
+ * this inode won't be splitted. so we need not to protect from
+ * just flush client's cache on modification.
+ */
ret_mode = 0;
if (mode == LCK_PW)
ret_mode = LCK_CW;
if (mode == LCK_PR) {
ret_mode = LCK_CR;
} else if (mode == LCK_PW) {
- /* caller gonna modify directory.we use concurrent
- write lock here to retract client's cache for readdir */
+ /*
+ * caller gonna modify directory.we use concurrent write
+ * lock here to retract client's cache for readdir.
+ */
ret_mode = LCK_CW;
if (split == MDS_EXPECT_SPLIT) {
- /* splitting possible. serialize any access
- * the idea is that first one seen dir is
- * splittable is given exclusive lock and
- * split directory. caller passes lock mode
- * to mds_try_to_split_dir() and splitting
- * would be done with exclusive lock only -bzzz */
- CDEBUG(D_OTHER, "%s: gonna split %u/%u\n",
+ /*
+ * splitting possible. serialize any access the
+ * idea is that first one seen dir is splittable
+ * is given exclusive lock and split
+ * directory. caller passes lock mode to
+ * mds_try_to_split_dir() and splitting would be
+ * done with exclusive lock only -bzzz.
+ */
+ CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
obd->obd_name,
- (unsigned) dentry->d_inode->i_ino,
- (unsigned) dentry->d_inode->i_generation);
+ (unsigned long)dentry->d_inode->i_ino,
+ (unsigned long)dentry->d_inode->i_generation);
ret_mode = LCK_EX;
}
}
}
/* only valid locked dentries or errors should be returned */
-struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
- struct vfsmount **mnt, int lock_mode,
- struct lustre_handle *lockh, int *mode,
- char *name, int namelen, __u64 lockpart)
+struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id,
+ struct vfsmount **mnt, int lock_mode,
+ struct lustre_handle *lockh, int *mode,
+ char *name, int namelen, __u64 lockpart)
{
- struct mds_obd *mds = &obd->u.mds;
- struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
+ struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de;
+ ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
struct ldlm_res_id res_id = { .name = {0} };
int flags = 0, rc;
- ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
-
ENTRY;
if (IS_ERR(de))
RETURN(de);
- res_id.name[0] = de->d_inode->i_ino;
- res_id.name[1] = de->d_inode->i_generation;
+ res_id.name[0] = id_fid(id);
+ res_id.name[1] = id_group(id);
lockh[1].cookie = 0;
#ifdef S_PDIROPS
if (name && IS_PDIROPS(de->d_inode)) {
res_id.name[2] = full_name_hash(name, namelen);
- CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
- de->d_inode->i_ino, de->d_inode->i_generation,
- res_id.name[2]);
+ CDEBUG(D_INFO, "take lock on "DLID4":"LPX64"\n",
+ OLID4(id), res_id.name[2]);
}
#else
#warning "No PDIROPS support in the kernel"
#endif
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
LDLM_IBITS, &policy, lock_mode, &flags,
- mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, lockh);
+ mds_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, lockh);
if (rc != ELDLM_OK) {
l_dput(de);
retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
#endif
-/* Look up an entry by inode number. */
-/* this function ONLY returns valid dget'd dentries with an initialized inode
- or errors */
-struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
- struct vfsmount **mnt)
+/* Look up an entry by inode number. This function ONLY returns valid dget'd
+ * dentries with an initialized inode or errors */
+struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
+ struct vfsmount **mnt)
{
- char fid_name[32];
- unsigned long ino = fid->id;
- __u32 generation = fid->generation;
+ char idname[32];
struct inode *inode;
struct dentry *result;
+ struct mds_obd *mds = &obd->u.mds;
+ __u32 generation = (__u32)id_gen(id);
+ unsigned long ino = (unsigned long)id_ino(id);
+
if (ino == 0)
RETURN(ERR_PTR(-ESTALE));
-
- snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
- CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
+ snprintf(idname, sizeof(idname), "0x%lx", ino);
+
+ CDEBUG(D_DENTRY, "--> mds_id2dentry: ino/gen %lu/%u, sb %p\n",
ino, generation, mds->mds_sb);
- /* under ext3 this is neither supposed to return bad inodes
- nor NULL inodes. */
- result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
+ /* under ext3 this is neither supposed to return bad inodes nor NULL
+ inodes. */
+ result = ll_lookup_one_len(idname, mds->mds_id_de,
+ strlen(idname));
if (IS_ERR(result))
RETURN(result);
/* here we disabled generation check, as root inode i_generation
* of cache mds and real mds are different. */
- if (inode->i_ino != mds->mds_rootfid.id && generation &&
- inode->i_generation != generation) {
+ if (inode->i_ino != id_ino(&mds->mds_rootid) && generation &&
+ inode->i_generation != generation) {
/* we didn't find the right inode.. */
CERROR("bad inode %lu, link: %lu ct: %d or generation %u/%u\n",
inode->i_ino, (unsigned long)inode->i_nlink,
/* Establish a connection to the MDS.
*
- * This will set up an export structure for the client to hold state data
- * about that client, like open files, the last operation number it did
- * on the server, etc.
+ * This will set up an export structure for the client to hold state data about
+ * that client, like open files, the last operation number it did on the server,
+ * etc.
*/
static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid, unsigned long connect_flags)
+ struct obd_uuid *cluuid, unsigned long flags)
{
- struct obd_export *exp;
- struct mds_export_data *med; /* */
+ struct mds_obd *mds = &obd->u.mds;
+ struct mds_export_data *med;
struct mds_client_data *mcd;
+ struct obd_export *exp;
int rc;
ENTRY;
if (!conn || !obd || !cluuid)
RETURN(-EINVAL);
- /* XXX There is a small race between checking the list and adding a
- * new connection for the same UUID, but the real threat (list
- * corruption when multiple different clients connect) is solved.
+ /* XXX There is a small race between checking the list and adding a new
+ * connection for the same UUID, but the real threat (list corruption
+ * when multiple different clients connect) is solved.
*
- * There is a second race between adding the export to the list,
- * and filling in the client data below. Hence skipping the case
- * of NULL mcd above. We should already be controlling multiple
- * connects at the client, and we can't hold the spinlock over
- * memory allocations without risk of deadlocking.
+ * There is a second race between adding the export to the list, and
+ * filling in the client data below. Hence skipping the case of NULL
+ * mcd above. We should already be controlling multiple connects at the
+ * client, and we can't hold the spinlock over memory allocations
+ * without risk of deadlocking.
*/
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
exp = class_conn2export(conn);
- LASSERT(exp);
+
+ LASSERT(exp != NULL);
med = &exp->exp_mds_data;
OBD_ALLOC(mcd, sizeof(*mcd));
if (!mcd) {
- CERROR("mds: out of memory for client data\n");
+ CERROR("%s: out of memory for client data.\n",
+ obd->obd_name);
GOTO(out, rc = -ENOMEM);
}
rc = mds_client_add(obd, &obd->u.mds, med, -1);
if (rc)
GOTO(out, rc);
-
- if (!(connect_flags & OBD_OPT_MDS_CONNECTION)) {
- struct mds_obd *mds = &obd->u.mds;
+
+ if (!(flags & OBD_OPT_MDS_CONNECTION)) {
if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
atomic_inc(&mds->mds_real_clients);
CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
if (mds->mds_lmv_name)
rc = mds_lmv_connect(obd, mds->mds_lmv_name);
}
+
EXIT;
out:
if (rc) {
OBD_FREE(mcd, sizeof(*mcd));
class_disconnect(exp, 0);
+ } else {
+ class_export_put(exp);
}
- class_export_put(exp);
-
- return rc;
+ RETURN(rc);
}
static int mds_init_export(struct obd_export *exp)
static int mds_disconnect(struct obd_export *exp, int flags)
{
+ unsigned long irqflags;
struct obd_device *obd;
struct mds_obd *mds;
- unsigned long irqflags;
int rc;
ENTRY;
- LASSERT(exp);
- class_export_get(exp);
-
+ LASSERT(exp != NULL);
obd = class_exp2obd(exp);
if (obd == NULL) {
CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
}
mds = &obd->u.mds;
+ /*
+ * suppress any inter-mds requests durring disconnecting lmv if this is
+ * detected --force mode. This is needed to avoid endless recovery.
+ */
+ if (atomic_read(&mds->mds_real_clients) > 0 &&
+ !(exp->exp_flags & OBD_OPT_REAL_CLIENT))
+ flags |= OBD_OPT_FORCE;
+
if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)
- && !atomic_read(&mds->mds_real_clients)) {
+ && !atomic_read(&mds->mds_real_clients)) {
/* there was no client at all */
mds_lmv_disconnect(obd, flags);
}
if ((exp->exp_flags & OBD_OPT_REAL_CLIENT)
- && atomic_dec_and_test(&mds->mds_real_clients)) {
+ && atomic_dec_and_test(&mds->mds_real_clients)) {
/* time to drop LMV connections */
CDEBUG(D_OTHER, "%s: last real client %s disconnected. "
"Disconnnect from LMV now\n",
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- /* Disconnect early so that clients can't keep using export */
+ /* disconnect early so that clients can't keep using export */
rc = class_disconnect(exp, flags);
ldlm_cancel_locks_for_export(exp);
spin_unlock(&svc->srv_lock);
}
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
-
- class_export_put(exp);
RETURN(rc);
}
{
struct mds_obd *mds = mds_req2mds(req);
struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size;
ENTRY;
+ size = sizeof(*body);
+
rc = lustre_pack_reply(req, 1, &size, NULL);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
CERROR("mds: out of memory for message: size=%d\n", size);
RETURN(-ENOMEM);
}
- body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
- memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+ body->valid |= OBD_MD_FID;
+ memcpy(&body->id1, &mds->mds_rootid, sizeof(body->id1));
- /* the last_committed and last_xid fields are filled in for all
- * replies already - no need to do so here also.
+ /*
+ * the last_committed and last_xid fields are filled in for all replies
+ * already - no need to do so here also.
*/
RETURN(0);
}
/* XXX layering violation! -phil */
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
- * such that mds_blocking_ast is called just before l_i_p takes the
- * ns_lock, then by the time we get the lock, we might not be the
- * correct blocking function anymore. So check, and return early, if
- * so. */
+
+ /*
+ * get this: if mds_blocking_ast is racing with mds_intent_policy, such
+ * that mds_blocking_ast is called just before l_i_p takes the ns_lock,
+ * then by the time we get the lock, we might not be the correct
+ * blocking function anymore. So check, and return early, if so.
+ */
if (lock->l_blocking_ast != mds_blocking_ast) {
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
RETURN(0);
return;
CDEBUG(D_OTHER, "squash req from 0x%llx, (%d:%d/%x)=>(%d:%d/%x)\n",
- *peernid,
- rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
+ *peernid, rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
mds->mds_squash_uid, mds->mds_squash_gid,
(rsd->rsd_cap & ~CAP_FS_MASK));
rsd->rsd_uid = mds->mds_squash_uid;
rsd->rsd_fsuid = mds->mds_squash_uid;
rsd->rsd_fsgid = mds->mds_squash_gid;
+
/* XXX should we remove all capabilities? */
rsd->rsd_cap &= ~CAP_FS_MASK;
}
static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
- struct ptlrpc_request *req,
- struct mds_body *reqbody, int reply_off)
+ struct ptlrpc_request *req, struct mds_body *reqbody,
+ int reply_off)
{
- struct mds_body *body;
struct inode *inode = dentry->d_inode;
+ struct mds_body *body;
int rc = 0;
ENTRY;
LASSERT(body != NULL); /* caller prepped reply */
if (dentry->d_flags & DCACHE_CROSS_REF) {
- CDEBUG(D_OTHER, "cross reference: %lu/%lu/%lu\n",
- (unsigned long) dentry->d_mdsnum,
- (unsigned long) dentry->d_inum,
- (unsigned long) dentry->d_generation);
- body->valid |= OBD_MD_FLID | OBD_MD_MDS;
- body->fid1.id = dentry->d_inum;
- body->fid1.mds = dentry->d_mdsnum;
- body->fid1.generation = dentry->d_generation;
+ mds_pack_dentry2body(obd, body, dentry,
+ (reqbody->valid & OBD_MD_FID));
+ CDEBUG(D_OTHER, "cross reference: "DLID4"\n",
+ OLID4(&body->id1));
RETURN(0);
}
- mds_pack_inode2fid(obd, &body->fid1, inode);
- mds_pack_inode2body(obd, body, inode);
+
+ mds_pack_inode2body(obd, body, inode, (reqbody->valid & OBD_MD_FID));
if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
(S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, body,
inode, 1);
- /* If we have LOV EA data, the OST holds size, atime, mtime */
+ /* if we have LOV EA data, the OST holds size, atime, mtime. */
if (!(body->valid & OBD_MD_FLEASIZE) &&
!(body->valid & OBD_MD_FLDIREA))
body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
OBD_MD_FLATIME | OBD_MD_FLMTIME);
} else if (S_ISLNK(inode->i_mode) &&
(reqbody->valid & OBD_MD_LINKNAME) != 0) {
- char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
- int len;
+ int len = req->rq_repmsg->buflens[reply_off + 1];
+ char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1, 0);
- LASSERT (symname != NULL); /* caller prepped reply */
- len = req->rq_repmsg->buflens[reply_off + 1];
+ LASSERT(symname != NULL); /* caller prepped reply */
- rc = inode->i_op->readlink(dentry, symname, len);
- if (rc < 0) {
- CERROR("readlink failed: %d\n", rc);
- } else if (rc != len - 1) {
- CERROR ("Unexpected readlink rc %d: expecting %d\n",
- rc, len - 1);
- rc = -EINVAL;
+ if (!inode->i_op->readlink) {
+ rc = -ENOSYS;
} else {
- CDEBUG(D_INODE, "read symlink dest %s\n", symname);
- body->valid |= OBD_MD_LINKNAME;
- body->eadatasize = rc + 1;
- symname[rc] = 0; /* NULL terminate */
- rc = 0;
+ rc = inode->i_op->readlink(dentry, symname, len);
+ if (rc < 0) {
+ CERROR("readlink failed: %d\n", rc);
+ } else if (rc != len - 1) {
+ CERROR("Unexpected readlink rc %d: expecting %d\n",
+ rc, len - 1);
+ rc = -EINVAL;
+ } else {
+ CDEBUG(D_INODE, "read symlink dest %s\n", symname);
+ body->valid |= OBD_MD_LINKNAME;
+ body->eadatasize = rc + 1;
+ symname[rc] = 0;
+ rc = 0;
+ }
}
}
GOTO(out, req->rq_status = rc);
}
- EXIT;
out:
- return(rc);
+ RETURN(rc);
}
static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
GOTO(out, req->rq_status = rc);
}
- EXIT;
out:
- return(rc);
+ RETURN(rc);
}
int mds_check_mds_num(struct obd_device *obd, struct inode* inode,
struct mea *mea = NULL;
int mea_size, rc = 0;
ENTRY;
-
+
rc = mds_get_lmv_attr(obd, inode, &mea, &mea_size);
if (rc)
RETURN(rc);
if (mea != NULL) {
- /* dir is already splitted, check if requested filename
- * should live at this MDS or at another one */
- int i;
- i = mea_name2idx(mea, name, namelen - 1);
- if (mea->mea_master != mea->mea_fids[i].mds) {
+ /*
+ * dir is already splitted, check if requested filename should
+ * live at this MDS or at another one.
+ */
+ int i = mea_name2idx(mea, name, namelen - 1);
+ if (mea->mea_master != id_group(&mea->mea_ids[i])) {
CDEBUG(D_OTHER,
- "inapropriate MDS(%d) for %s. should be %d(%d)\n",
- mea->mea_master, name, mea->mea_fids[i].mds, i);
+ "inapropriate MDS(%d) for %s. should be "
+ "%lu(%d)\n", mea->mea_master, name,
+ (unsigned long)id_group(&mea->mea_ids[i]), i);
rc = -ERESTART;
}
}
ENTRY;
LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
-
MDS_UPDATE_COUNTER((&obd->u.mds), MDS_GETATTR_NAME_COUNT);
rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
}
mds_squash_root(mds, rsd, &req->rq_peer.peer_id.nid);
- /* Swab now, before anyone looks inside the request */
+ /* swab now, before anyone looks inside the request. */
body = lustre_swab_reqbuf(req, offset, sizeof(*body),
lustre_swab_mds_body);
if (body == NULL) {
name = NULL;
LASSERT (offset == 1 || offset == 3);
- /* if requests were at offset 2, the getattr reply goes back at 1 */
if (offset == 3) {
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
reply_offset = 1;
}
#if 0
#if HAVE_LOOKUP_RAW
- /* FIXME: handle raw lookup */
if (body->valid == OBD_MD_FLID) {
struct mds_body *mds_reply;
int size = sizeof(*mds_reply);
struct inode *dir;
ino_t inum;
- dparent = mds_fid2dentry(mds, &body->fid1, NULL);
+
+ dparent = mds_id2dentry(obd, &body->id1, NULL);
if (IS_ERR(dparent)) {
rc = PTR_ERR(dparent);
GOTO(cleanup, rc);
}
- // The user requested ONLY the inode number, so do a raw lookup
+
+ /*
+ * the user requested ONLY the inode number, so do a raw lookup.
+ */
rc = lustre_pack_reply(req, 1, &size, NULL);
if (rc) {
CERROR("out of memory\n");
l_dput(dparent);
mds_reply = lustre_msg_buf(req->rq_repmsg, 0,
sizeof(*mds_reply));
- mds_reply->fid1.id = inum;
+
+ id_ino(&mds_reply->id1) = inum;
mds_reply->valid = OBD_MD_FLID;
GOTO(cleanup, rc);
}
#endif
if (resent_req == 0) {
if (name) {
- rc = mds_get_parent_child_locked(obd, mds, &body->fid1,
+ rc = mds_get_parent_child_locked(obd, mds, &body->id1,
parent_lockh, &dparent,
LCK_PR,
MDS_INODELOCK_LOOKUP,
/*FIXME: we need MDS_INODELOCK_LOOKUP or not*/
child_part &= ~MDS_INODELOCK_LOOKUP;
CDEBUG(D_OTHER, "%s: retrieve attrs for %lu/%lu\n",
- obd->obd_name, (unsigned long) body->fid1.id,
- (unsigned long) body->fid1.generation);
+ obd->obd_name, (unsigned long)id_ino(&body->id1),
+ (unsigned long)id_gen(&body->id1));
#if 0
- dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL,
- LCK_PR, parent_lockh,
- &update_mode,
- NULL, 0, child_part);
+ dchild = mds_id2locked_dentry(obd, &body->id1, NULL,
+ LCK_PR, parent_lockh,
+ &update_mode,
+ NULL, 0, child_part);
#else
- dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL,
- LCK_PR, parent_lockh,
- &update_mode,
- NULL, 0,
- MDS_INODELOCK_UPDATE);
+ dchild = mds_id2locked_dentry(obd, &body->id1, NULL,
+ LCK_PR, parent_lockh,
+ &update_mode,
+ NULL, 0,
+ MDS_INODELOCK_UPDATE);
#endif
if (IS_ERR(dchild)) {
CERROR("can't find inode: %d\n",
GOTO(cleanup, rc);
} else {
struct ldlm_lock *granted_lock;
- struct ll_fid child_fid;
- struct ldlm_resource *res;
+
DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
granted_lock = ldlm_handle2lock(child_lockh);
- LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
- body->fid1.id, body->fid1.generation,
+ LASSERTF(granted_lock != NULL, LPU64"/%lu lockh "LPX64"\n",
+ id_fid(&body->id1), (unsigned long)id_group(&body->id1),
child_lockh->cookie);
- res = granted_lock->l_resource;
- child_fid.id = res->lr_name.name[0];
- child_fid.generation = res->lr_name.name[1];
- dchild = mds_fid2dentry(mds, &child_fid, NULL);
- LASSERT(dchild);
- LDLM_LOCK_PUT(granted_lock);
+ dparent = mds_id2dentry(obd, &body->id1, NULL);
+ LASSERT(!IS_ERR(dparent));
+
+ dchild = ll_lookup_one_len(name, dparent, namesize - 1);
+ LASSERT(!IS_ERR(dchild));
+
+ l_dput(dparent);
+ resent_req = 1;
}
cleanup_phase = 2; /* dchild, dparent, locks */
fill_inode:
-
if (!DENTRY_VALID(dchild)) {
intent_set_disposition(rep, DISP_LOOKUP_NEG);
- /* in the intent case, the policy clears this error:
- the disposition is enough */
+ /*
+ * in the intent case, the policy clears this error: the
+ * disposition is enough.
+ */
rc = -ENOENT;
GOTO(cleanup, rc);
} else {
case 1:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
mds_exit_ucred(&uc);
- default: ;
}
- return rc;
+ RETURN(rc);
}
static int mds_getattr(struct ptlrpc_request *req, int offset)
{
- struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
+ struct mds_obd *mds = &obd->u.mds;
struct lvfs_run_ctxt saved;
struct dentry *de;
struct mds_req_sec_desc *rsd;
RETURN(-EFAULT);
}
- body = lustre_swab_reqbuf(req, offset, sizeof (*body),
+ body = lustre_swab_reqbuf(req, offset, sizeof(*body),
lustre_swab_mds_body);
if (body == NULL) {
CERROR ("Can't unpack body\n");
}
push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
- de = mds_fid2dentry(mds, &body->fid1, NULL);
+ de = mds_id2dentry(obd, &body->id1, NULL);
if (IS_ERR(de)) {
rc = req->rq_status = PTR_ERR(de);
GOTO(out_pop, rc);
out_pop:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
mds_exit_ucred(&uc);
- return rc;
+ RETURN(rc);
}
static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
int rc;
+ ENTRY;
spin_lock(&obd->obd_osfs_lock);
rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
spin_unlock(&obd->obd_osfs_lock);
- return rc;
+ RETURN(rc);
}
static int mds_statfs(struct ptlrpc_request *req)
GOTO(out, rc);
}
- EXIT;
out:
req->rq_status = rc;
- return 0;
+ RETURN(0);
}
static int mds_sync(struct ptlrpc_request *req, int offset)
GOTO(out, rc);
}
- if (body->fid1.id == 0) {
- /* a fid of zero is taken to mean "sync whole filesystem" */
+ if (id_ino(&body->id1) == 0) {
+ /* an id of zero is taken to mean "sync whole filesystem" */
rc = fsfilt_sync(obd, mds->mds_sb);
if (rc)
GOTO(out, rc);
struct file *file = mds->mds_rcvd_filp;
struct dentry *de;
- de = mds_fid2dentry(mds, &body->fid1, NULL);
+ de = mds_id2dentry(obd, &body->id1, NULL);
if (IS_ERR(de))
GOTO(out, rc = PTR_ERR(de));
rc = file->f_op->fsync(NULL, de, 1);
- l_dput(de);
if (rc)
GOTO(out, rc);
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
- mds_pack_inode2fid(obd, &body->fid1, de->d_inode);
- mds_pack_inode2body(obd, body, de->d_inode);
+ mds_pack_inode2body(obd, body, de->d_inode, 0);
+ l_dput(de);
}
out:
req->rq_status = rc;
- return 0;
+ RETURN(0);
}
/* mds_readpage does not take a DLM lock on the inode, because the client must
}
push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
- de = mds_fid2dentry(mds, &body->fid1, &mnt);
+ de = mds_id2dentry(obd, &body->id1, &mnt);
if (IS_ERR(de))
GOTO(out_pop, rc = PTR_ERR(de));
RETURN(0);
}
+/* update master MDS ID, which is stored in local inode EA. */
+int mds_update_mid(struct obd_device *obd, struct lustre_id *id,
+ void *data, int data_len)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct dentry *dentry;
+ void *handle;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(id);
+ LASSERT(obd);
+
+ dentry = mds_id2dentry(obd, id, NULL);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+
+ if (!dentry->d_inode) {
+ CERROR("Can't find object "DLID4".\n",
+ OLID4(id));
+ GOTO(out_dentry, rc = -EINVAL);
+ }
+
+ handle = fsfilt_start(obd, dentry->d_inode,
+ FSFILT_OP_SETATTR, NULL);
+ if (IS_ERR(handle))
+ GOTO(out_dentry, rc = PTR_ERR(handle));
+
+ rc = mds_update_inode_mid(obd, dentry->d_inode, handle,
+ (struct lustre_id *)data);
+ if (rc) {
+ CERROR("Can't update inode "DLID4" master id, "
+ "error = %d.\n", OLID4(id), rc);
+ GOTO(out_commit, rc);
+ }
+
+out_commit:
+ fsfilt_commit(obd, mds->mds_sb, dentry->d_inode,
+ handle, 0);
+out_dentry:
+ l_dput(dentry);
+out:
+ RETURN(rc);
+}
+EXPORT_SYMBOL(mds_update_mid);
+
+/* read master MDS ID, which is stored in local inode EA. */
+int mds_read_mid(struct obd_device *obd, struct lustre_id *id,
+ void *data, int data_len)
+{
+ struct dentry *dentry;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(id);
+ LASSERT(obd);
+
+ dentry = mds_id2dentry(obd, id, NULL);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+
+ if (!dentry->d_inode) {
+ CERROR("Can't find object "DLID4".\n",
+ OLID4(id));
+ GOTO(out_dentry, rc = -EINVAL);
+ }
+
+ down(&dentry->d_inode->i_sem);
+ rc = mds_read_inode_mid(obd, dentry->d_inode,
+ (struct lustre_id *)data);
+ up(&dentry->d_inode->i_sem);
+ if (rc) {
+ CERROR("Can't read inode "DLID4" master id, "
+ "error = %d.\n", OLID4(id), rc);
+ GOTO(out_dentry, rc);
+ }
+
+out_dentry:
+ l_dput(dentry);
+out:
+ RETURN(rc);
+}
+EXPORT_SYMBOL(mds_read_mid);
+
int mds_reint(struct ptlrpc_request *req, int offset,
struct lustre_handle *lockh)
{
struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
- struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */
+ struct mds_update_record *rec;
struct mds_req_sec_desc *rsd;
int rc;
ENTRY;
/* rc will be used to interrupt a for loop over multiple records */
rc = mds_reint_rec(rec, offset, req, lockh);
-
mds_exit_ucred(&rec->ur_uc);
out:
OBD_FREE(rec, sizeof(*rec));
[REINT_OPEN] "open",
};
-#define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER |\
- OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ|\
- OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
+#define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER | \
+ OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ| \
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME| \
OBD_MD_FLID)
static void reconstruct_create(struct ptlrpc_request *req)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
- struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
struct mds_client_data *mcd = med->med_mcd;
struct dentry *dentry;
struct ost_body *body;
- struct ll_fid fid;
+ struct lustre_id id;
+ int rc;
ENTRY;
/* copy rc, transno and disp; steal locks */
return;
}
- fid.mds = 0;
- fid.id = mcd->mcd_last_data;
- fid.generation = 0;
+ id_gen(&id) = 0;
+ id_group(&id) = 0;
- LASSERT(fid.id != 0);
+ id_ino(&id) = mcd->mcd_last_data;
+ LASSERT(id_ino(&id) != 0);
- dentry = mds_fid2dentry(mds, &fid, NULL);
+ dentry = mds_id2dentry(req2obd(req), &id, NULL);
if (IS_ERR(dentry)) {
- CERROR("can't find inode "LPU64"\n", fid.id);
+ CERROR("can't find inode "LPU64"\n", id_ino(&id));
req->rq_status = PTR_ERR(dentry);
EXIT;
return;
}
CWARN("reconstruct reply for x"LPU64" (remote ino) "LPU64" -> %lu/%u\n",
- req->rq_xid, fid.id, dentry->d_inode->i_ino,
+ req->rq_xid, id_ino(&id), dentry->d_inode->i_ino,
dentry->d_inode->i_generation);
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
body->oa.o_id = dentry->d_inode->i_ino;
body->oa.o_generation = dentry->d_inode->i_generation;
body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
+
+ down(&dentry->d_inode->i_sem);
+ rc = mds_read_inode_sid(req2obd(req), dentry->d_inode, &id);
+ up(&dentry->d_inode->i_sem);
+ if (rc) {
+ CERROR("Can't read inode self id, inode %lu, "
+ "rc %d\n", dentry->d_inode->i_ino, rc);
+ id_fid(&id) = 0;
+ }
+
+ body->oa.o_fid = id_fid(&id);
+ body->oa.o_mds = id_group(&id);
l_dput(dentry);
+
EXIT;
- return;
}
static int mdt_obj_create(struct ptlrpc_request *req)
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_obd *mds = &obd->u.mds;
struct ost_body *body, *repbody;
- char fidname[LL_FID_NAMELEN];
+ char idname[LL_ID_NAMELEN];
+ int size = sizeof(*repbody);
struct inode *parent_inode;
struct lvfs_run_ctxt saved;
+ int rc, cleanup_phase = 0;
struct dentry *new = NULL;
struct dentry_params dp;
- int mealen, flags = 0, rc, size = sizeof(*repbody), cleanup_phase = 0;
+ int mealen, flags = 0;
struct lvfs_ucred uc;
+ struct lustre_id id;
struct mea *mea;
void *handle = NULL;
unsigned long cr_inum = 0;
parent_inode = mds->mds_unnamed_dir->d_inode;
- body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+ lustre_swab_ost_body);
if (body == NULL)
RETURN(-EFAULT);
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+ /* in the case of recreate body->oa should have store cookie part
+ * initialized too, as it is needed to check if object is already
+ * created. But this is not cache flush case, as object was not created
+ * on this MDS at least once and its store cookie is not known. */
if (body->oa.o_flags & OBD_FL_RECREATE_OBJS) {
/* this is re-create request from MDS holding directory name.
- * we have to lookup given ino/generation first. if it exists
- * (good case) then there is nothing to do. if it does not
- * then we have to recreate it */
- struct ll_fid fid;
- fid.id = body->oa.o_id;
- fid.generation = body->oa.o_generation;
- new = mds_fid2dentry(mds, &fid, NULL);
+ * we have to lookup given ino/gen first. if it exists
+ * (good case) then there is nothing to do. if it does not then
+ * we have to recreate it */
+ id_fid(&id) = 0;
+ id_group(&id) = 0;
+
+ id_ino(&id) = body->oa.o_id;
+ id_gen(&id) = body->oa.o_generation;
+
+ new = mds_id2dentry(obd, &id, NULL);
if (!IS_ERR(new) && new->d_inode) {
- CDEBUG(D_HA, "mkdir() repairing is on its way: %lu/%lu\n",
- (unsigned long) fid.id,
- (unsigned long) fid.generation);
+ struct lustre_id sid;
+
+ CWARN("mkdir() repairing is on its way: %lu/%lu\n",
+ (unsigned long)id_ino(&id), (unsigned long)id_gen(&id));
+
obdo_from_inode(&repbody->oa, new->d_inode,
FILTER_VALID_FLAGS);
+
repbody->oa.o_id = new->d_inode->i_ino;
repbody->oa.o_generation = new->d_inode->i_generation;
repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
cleanup_phase = 1;
+
+ down(&new->d_inode->i_sem);
+ rc = mds_read_inode_sid(obd, new->d_inode, &sid);
+ up(&new->d_inode->i_sem);
+ if (rc) {
+ CERROR("Can't read inode self id "
+ "inode %lu, rc %d.\n",
+ new->d_inode->i_ino, rc);
+ GOTO(cleanup, rc);
+ }
+
+ repbody->oa.o_fid = id_fid(&sid);
+ repbody->oa.o_mds = id_group(&sid);
cr_inum = new->d_inode->i_ino;
GOTO(cleanup, rc = 0);
}
- CWARN("hmm. for some reason dir %lu/%lu (or reply) got lost\n",
- (unsigned long) fid.id, (unsigned long) fid.generation);
+ CWARN("for some reason dir %lu/%lu (or reply) got lost\n",
+ (unsigned long)id_ino(&id), (unsigned long)id_gen(&id));
+
LASSERT(new->d_inode == NULL ||
- new->d_inode->i_generation != fid.generation);
+ new->d_inode->i_generation != id_gen(&id));
+
l_dput(new);
}
handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
if (IS_ERR(handle)) {
up(&parent_inode->i_sem);
+ CERROR("fsfilt_start() failed, rc = %d\n",
+ (int)PTR_ERR(handle));
GOTO(cleanup, rc = PTR_ERR(handle));
}
cleanup_phase = 1; /* transaction */
repeat:
- rc = sprintf(fidname, "%u.%u", ll_insecure_random_int(), current->pid);
- new = lookup_one_len(fidname, mds->mds_unnamed_dir, rc);
+ rc = sprintf(idname, "%u.%u", ll_insecure_random_int(), current->pid);
+ new = lookup_one_len(idname, mds->mds_unnamed_dir, rc);
if (IS_ERR(new)) {
CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
- obd->obd_name, fidname, (int) PTR_ERR(new));
+ obd->obd_name, idname, (int) PTR_ERR(new));
fsfilt_commit(obd, mds->mds_sb, new->d_inode, handle, 0);
up(&parent_inode->i_sem);
RETURN(PTR_ERR(new));
goto repeat;
}
- new->d_fsdata = (void *) &dp;
+ new->d_fsdata = (void *)&dp;
dp.p_inum = 0;
dp.p_ptr = req;
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
- (unsigned long) body->oa.o_id,
- (unsigned long) body->oa.o_generation);
+ (unsigned long)body->oa.o_id,
+ (unsigned long)body->oa.o_generation);
dp.p_inum = body->oa.o_id;
}
+
rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
if (rc == 0) {
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
new->d_inode->i_generation = body->oa.o_generation;
mark_inode_dirty(new->d_inode);
- LASSERTF(body->oa.o_id == new->d_inode->i_ino,
- "BUG 3550: failed to recreate obj "
- LPU64" -> %lu\n",
- body->oa.o_id, new->d_inode->i_ino);
- LASSERTF(body->oa.o_generation ==
- new->d_inode->i_generation,
- "BUG 3550: failed to recreate obj/gen "
- LPU64"/%u -> %lu/%u\n",
- body->oa.o_id, body->oa.o_generation,
- new->d_inode->i_ino,
- new->d_inode->i_generation);
+
+ /*
+ * avoiding asserts in cache flush case, as
+ * @body->oa.o_id should be zero.
+ */
+ if (body->oa.o_id) {
+ LASSERTF(body->oa.o_id == new->d_inode->i_ino,
+ "BUG 3550: failed to recreate obj "
+ LPU64" -> %lu\n", body->oa.o_id,
+ new->d_inode->i_ino);
+
+ LASSERTF(body->oa.o_generation ==
+ new->d_inode->i_generation,
+ "BUG 3550: failed to recreate obj/gen "
+ LPU64"/%u -> %lu/%u\n", body->oa.o_id,
+ body->oa.o_generation,
+ new->d_inode->i_ino,
+ new->d_inode->i_generation);
+ }
}
-
+
obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
repbody->oa.o_id = new->d_inode->i_ino;
repbody->oa.o_generation = new->d_inode->i_generation;
repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ /* building lustre_id on passed @oa. */
+ id_group(&id) = mds->mds_num;
+
+ LASSERT(body->oa.o_fid != 0);
+ id_fid(&id) = body->oa.o_fid;
+
+ id_ino(&id) = repbody->oa.o_id;
+ id_gen(&id) = repbody->oa.o_generation;
+
+ down(&new->d_inode->i_sem);
+ rc = mds_update_inode_sid(obd, new->d_inode, handle, &id);
+ up(&new->d_inode->i_sem);
+
+ spin_lock(&mds->mds_fid_lock);
+ if (id_fid(&id) > mds->mds_last_fid)
+ mds->mds_last_fid = id_fid(&id);
+ spin_unlock(&mds->mds_fid_lock);
+ } else {
+ down(&new->d_inode->i_sem);
+ rc = mds_alloc_inode_sid(obd, new->d_inode, handle, &id);
+ up(&new->d_inode->i_sem);
+ }
+ if (rc) {
+ CERROR("Can't update lustre ID for inode %lu, "
+ "error = %d\n", new->d_inode->i_ino, rc);
+ GOTO(cleanup, rc);
+ }
+
+ /* initializing o_fid after it is allocated. */
+ repbody->oa.o_fid = id_fid(&id);
+ repbody->oa.o_mds = id_group(&id);
+
rc = fsfilt_del_dir_entry(obd, new);
up(&parent_inode->i_sem);
-
if (rc) {
CERROR("can't remove name for object: %d\n", rc);
GOTO(cleanup, rc);
}
-
+
cleanup_phase = 2; /* created directory object */
+ CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+ (unsigned long)new->d_inode->i_ino,
+ (unsigned long)new->d_inode->i_generation,
+ (unsigned)new->d_inode->i_mode);
cr_inum = new->d_inode->i_ino;
- CDEBUG(D_OTHER, "created dirobj: %lu/%u mode %o\n",
- new->d_inode->i_ino, new->d_inode->i_generation,
- new->d_inode->i_mode);
} else {
up(&parent_inode->i_sem);
CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
}
if (body->oa.o_valid & OBD_MD_FLID) {
- /* this is new object for splitted dir. we have to
- * prevent recursive splitting on it -bzzz */
+ /* this is new object for splitted dir. We have to prevent
+ * recursive splitting on it -bzzz */
mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+
OBD_ALLOC(mea, mealen);
if (mea == NULL)
GOTO(cleanup, rc = -ENOMEM);
+
mea->mea_magic = MEA_MAGIC_ALL_CHARS;
mea->mea_master = 0;
mea->mea_count = 0;
+
down(&new->d_inode->i_sem);
rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
up(&new->d_inode->i_sem);
+ if (rc)
+ CERROR("fsfilt_set_md() failed, rc = %d\n", rc);
+
OBD_FREE(mea, mealen);
CDEBUG(D_OTHER, "%s: mark non-splittable %lu/%u - %d\n",
obd->obd_name, new->d_inode->i_ino,
/* we pass LCK_EX to split routine to signal that we have
* exclusive access to the directory. simple because nobody
* knows it already exists -bzzz */
- mds_try_to_split_dir(obd, new, NULL, body->oa.o_easize, LCK_EX);
+ rc = mds_try_to_split_dir(obd, new, NULL,
+ body->oa.o_easize, LCK_EX);
+ if (rc < 0) {
+ CERROR("Can't split directory %lu, error = %d.\n",
+ new->d_inode->i_ino, rc);
+ } else {
+ rc = 0;
+ }
}
cleanup:
if (rc == 0)
ptlrpc_require_repack(req);
case 1: /* transaction */
- rc = mds_finish_transno(mds, parent_inode, handle, req, rc, cr_inum);
+ rc = mds_finish_transno(mds, parent_inode, handle,
+ req, rc, cr_inum);
}
l_dput(new);
static int mdt_get_info(struct ptlrpc_request *req)
{
- char *key;
struct obd_export *exp = req->rq_export;
- int keylen, rc = 0, size = sizeof(obd_id);
- obd_id *reply;
+ int keylen, rc = 0;
+ char *key;
ENTRY;
key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
}
keylen = req->rq_reqmsg->buflens[0];
- if (keylen < strlen("mdsize") || memcmp(key, "mdsize", 6) != 0)
+ if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
+ (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
+ (keylen < strlen("rootid") || strcmp(key, "rootid") != 0))
RETURN(-EPROTO);
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc)
- RETURN(rc);
+ if (keylen >= strlen("rootid") && !strcmp(key, "rootid")) {
+ struct lustre_id *reply;
+ int size = sizeof(*reply);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+ rc = obd_get_info(exp, keylen, key, &size, reply);
+ } else {
+ obd_id *reply;
+ int size = sizeof(*reply);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+ rc = obd_get_info(exp, keylen, key, &size, reply);
+ }
- reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
- rc = obd_get_info(exp, keylen, key, &size, reply);
req->rq_repmsg->status = 0;
RETURN(rc);
}
{
struct obd_device *obd;
struct mds_obd *mds;
- int rc = 0;
+ int rc = 0;
ENTRY;
obd = class_exp2obd(exp);
RETURN(-EINVAL);
}
-#define KEY_IS(str) \
- (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
-
mds = &obd->u.mds;
- if (KEY_IS("mds_num")) {
+ if (keylen >= strlen("mds_type") &&
+ memcmp(key, "mds_type", keylen) == 0) {
int valsize;
__u32 group;
- CDEBUG(D_IOCTL, "set mds num %d\n", *(int*)val);
- mds->mds_num = *(int*)val;
- group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+
+ CDEBUG(D_IOCTL, "set mds type to %x\n", *(int*)val);
+
+ mds->mds_obd_type = *(int*)val;
+ group = FILTER_GROUP_FIRST_MDS + mds->mds_obd_type;
valsize = sizeof(group);
- /*mds number has been changed, so the corresponding obdfilter exp
- *need to be changed too*/
- rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
- valsize, &group);
+
+ /* mds number has been changed, so the corresponding obdfilter
+ * exp need to be changed too. */
+ rc = obd_set_info(mds->mds_lov_exp, strlen("mds_conn"),
+ "mds_conn", valsize, &group);
RETURN(rc);
}
CDEBUG(D_IOCTL, "invalid key\n");
}
keylen = req->rq_reqmsg->buflens[0];
- if (keylen == strlen("mds_num") &&
- memcmp(key, "mds_num", keylen) == 0) {
+ if (keylen == strlen("mds_type") &&
+ memcmp(key, "mds_type", keylen) == 0) {
rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc)
RETURN(rc);
+
val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
-
vallen = req->rq_reqmsg->buflens[1];
rc = obd_set_info(exp, keylen, key, vallen, val);
req->rq_repmsg->status = 0;
RETURN(rc);
- } else if (keylen == strlen("client") &&
- memcmp(key, "client", keylen) == 0) {
- rc = lustre_pack_reply(req, 0, NULL, NULL);
- if (rc)
- RETURN(rc);
- rc = obd_set_info(exp, keylen, key, sizeof(obd_id), NULL);
- req->rq_repmsg->status = 0;
- RETURN(rc);
- }
+ }
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
}
break;
-
case MDS_REINT: {
__u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
sizeof (*opcp));
return 0;
}
-/* Update the server data on disk. This stores the new mount_count and
- * also the last_rcvd value to disk. If we don't have a clean shutdown,
- * then the server last_rcvd value may be less than that of the clients.
- * This will alert us that we may need to do client recovery.
+/* Update the server data on disk. This stores the new mount_count and also the
+ * last_rcvd value to disk. If we don't have a clean shutdown, then the server
+ * last_rcvd value may be less than that of the clients. This will alert us
+ * that we may need to do client recovery.
*
* Also assumes for mds_last_transno that we are not modifying it (no locking).
*/
RETURN(rc);
}
+/* saves last allocated fid counter to file. */
+int mds_update_last_fid(struct obd_device *obd, int force_sync)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct file *filp = mds->mds_fid_filp;
+ struct lvfs_run_ctxt saved;
+ loff_t off = 0;
+ int rc;
+ ENTRY;
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ CDEBUG(D_SUPER, "MDS last_fid is "LPU64"\n", mds->mds_last_fid);
+
+ rc = fsfilt_write_record(obd, filp, &mds->mds_last_fid,
+ sizeof(mds->mds_last_fid),
+ &off, force_sync);
+ if (rc)
+ CERROR("error writing MDS last_fid: rc = %d\n", rc);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ RETURN(rc);
+}
+
+void mds_commit_last_transno_cb(struct obd_device *obd,
+ __u64 transno, void *data,
+ int error)
+{
+ obd_transno_commit_cb(obd, transno, error);
+}
+
+void mds_commit_last_fid_cb(struct obd_device *obd,
+ __u64 fid, void *data,
+ int error)
+{
+ if (error) {
+ CERROR("%s: fid "LPD64" commit error: %d\n",
+ obd->obd_name, fid, error);
+ return;
+ }
+
+ CDEBUG(D_HA, "%s: fid "LPD64" committed\n",
+ obd->obd_name, fid);
+}
+
+/*
+ * allocates new lustre_id on passed @inode and saves it to inode EA.
+ */
+int mds_alloc_inode_sid(struct obd_device *obd, struct inode *inode,
+ void *handle, struct lustre_id *id)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(id != NULL);
+ LASSERT(obd != NULL);
+
+ id_group(id) = mds->mds_num;
+
+ spin_lock(&mds->mds_fid_lock);
+ id_fid(id) = mds->mds_last_fid++;
+ spin_unlock(&mds->mds_fid_lock);
+
+ id_ino(id) = inode->i_ino;
+ id_gen(id) = inode->i_generation;
+ id_type(id) = (S_IFMT & inode->i_mode);
+
+ rc = mds_update_inode_sid(obd, inode, handle, id);
+ if (rc) {
+ CERROR("Can't update inode FID EA, "
+ "rc = %d\n", rc);
+ }
+ RETURN(rc);
+}
+
+/*
+ * reads inode self id from inode EA. Probably later this should be replaced by
+ * caching inode self id to avoid raeding it every time it is needed.
+ */
+int mds_read_inode_sid(struct obd_device *obd, struct inode *inode,
+ struct lustre_id *id)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(id != NULL);
+ LASSERT(obd != NULL);
+ LASSERT(inode != NULL);
+
+ rc = fsfilt_get_sid(obd, inode, &id->li_fid,
+ sizeof(id->li_fid));
+ if (rc < 0) {
+ CERROR("fsfilt_get_sid() failed, "
+ "rc = %d\n", rc);
+ RETURN(rc);
+ } else if (!rc) {
+ rc = -ENODATA;
+ RETURN(rc);
+ } else {
+ rc = 0;
+ }
+
+ RETURN(rc);
+}
+
+/* updates inode self id in EA. */
+int mds_update_inode_sid(struct obd_device *obd, struct inode *inode,
+ void *handle, struct lustre_id *id)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(id != NULL);
+ LASSERT(obd != NULL);
+ LASSERT(inode != NULL);
+
+ rc = fsfilt_set_sid(obd, inode, handle, &id->li_fid,
+ sizeof(id->li_fid));
+ if (rc) {
+ CERROR("fsfilt_set_sid() failed, rc = %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
+/*
+ * reads inode id on master MDS. This is usualy done by CMOBD to update requests
+ * to master MDS by correct store cookie, needed to find inode on master MDS
+ * quickly.
+ */
+int mds_read_inode_mid(struct obd_device *obd, struct inode *inode,
+ struct lustre_id *id)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(id != NULL);
+ LASSERT(obd != NULL);
+ LASSERT(inode != NULL);
+
+ rc = fsfilt_get_mid(obd, inode, id, sizeof(*id));
+ if (rc < 0) {
+ CERROR("fsfilt_get_mid() failed, "
+ "rc = %d\n", rc);
+ RETURN(rc);
+ } else if (!rc) {
+ rc = -ENODATA;
+ RETURN(rc);
+ } else {
+ rc = 0;
+ }
+
+ RETURN(rc);
+}
+
+/*
+ * updates master inode id. Usualy this is done by CMOBD after an inode is
+ * created and relationship between cache MDS and master one should be
+ * established.
+ */
+int mds_update_inode_mid(struct obd_device *obd, struct inode *inode,
+ void *handle, struct lustre_id *id)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(id != NULL);
+ LASSERT(obd != NULL);
+ LASSERT(inode != NULL);
+
+ rc = fsfilt_set_mid(obd, inode, handle, id, sizeof(*id));
+ if (rc) {
+ CERROR("fsfilt_set_mid() failed, rc = %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
/* mount the file system (secretly) */
static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
{
options = (char *)page;
memset(options, 0, PAGE_SIZE);
- /* here we use "iopen_nopriv" hardcoded, because it affects MDS utility
+ /*
+ * here we use "iopen_nopriv" hardcoded, because it affects MDS utility
* and the rest of options are passed by mount options. Probably this
- * should be moved to somewhere else like startup scripts or lconf. */
+ * should be moved to somewhere else like startup scripts or lconf.
+ */
sprintf(options, "iopen_nopriv");
if (lcfg->lcfg_inllen4 > 0 && lcfg->lcfg_inlbuf4)
}
}
- /* FIXME-WANGDI: this should be reworked when we will use lmv along
- * with cobd, because correct mdsnum is set in mds_lmv_connect(). */
- if (lcfg->lcfg_inllen6 > 0 && lcfg->lcfg_inlbuf6 && !mds->mds_lmv_obd &&
+ mds->mds_obd_type = MDS_MASTER_OBD;
+
+ if (lcfg->lcfg_inllen6 > 0 && lcfg->lcfg_inlbuf6 &&
strcmp(lcfg->lcfg_inlbuf6, "dumb")) {
- if (!memcmp(lcfg->lcfg_inlbuf6, "master", strlen("master")) &&
- mds->mds_num == 0) {
- mds->mds_num = REAL_MDS_NUMBER;
- } else if (!memcmp(lcfg->lcfg_inlbuf6, "cache", strlen("cache")) &&
- mds->mds_num == 0) {
- mds->mds_num = CACHE_MDS_NUMBER;
+ if (!memcmp(lcfg->lcfg_inlbuf6, "master", strlen("master"))) {
+ mds->mds_obd_type = MDS_MASTER_OBD;
+ } else if (!memcmp(lcfg->lcfg_inlbuf6, "cache", strlen("cache"))) {
+ mds->mds_obd_type = MDS_CACHE_OBD;
}
}
- mnt = do_kern_mount(lcfg->lcfg_inlbuf2, 0,
- lcfg->lcfg_inlbuf1, options);
-
+ mnt = do_kern_mount(lcfg->lcfg_inlbuf2, 0, lcfg->lcfg_inlbuf1, options);
free_page(page);
if (IS_ERR(mnt)) {
CDEBUG(D_SUPER, "%s: mnt = %p\n", lcfg->lcfg_inlbuf1, mnt);
- sema_init(&mds->mds_orphan_recovery_sem, 1);
sema_init(&mds->mds_epoch_sem, 1);
+ spin_lock_init(&mds->mds_fid_lock);
+ atomic_set(&mds->mds_real_clients, 0);
spin_lock_init(&mds->mds_transno_lock);
+ sema_init(&mds->mds_orphan_recovery_sem, 1);
mds->mds_max_cookiesize = sizeof(struct llog_cookie);
- atomic_set(&mds->mds_real_clients, 0);
sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
obd->obd_name, rc);
GOTO(err_ns, rc);
}
-
+
rc = llog_start_commit_thread();
if (rc < 0)
GOTO(err_fs, rc);
- /* this check for @dumb string is needed to handle mounting MDS with
- * smfs. Read lconf:MDSDEV.write_conf() for more details. */
+ /*
+ * this check for @dumb string is needed to handle mounting MDS with
+ * smfs. Read lconf:MDSDEV.write_conf() for more details.
+ */
if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3 &&
strcmp(lcfg->lcfg_inlbuf3, "dumb")) {
class_uuid_t uuid;
memcpy(mds->mds_profile, lcfg->lcfg_inlbuf3,
lcfg->lcfg_inllen3);
+
+ /*
+ * setup root id in the case this is not clients write
+ * setup. This is important, as in the case of LMV we need
+ * mds->mds_num to be already assigned to form correct root fid.
+ */
+ rc = mds_fs_setup_rootid(obd);
+ if (rc)
+ GOTO(err_fs, rc);
+
+ /* setup lustre id for ID directory. */
+ rc = mds_fs_setup_virtid(obd);
+ if (rc)
+ GOTO(err_fs, rc);
}
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
CERROR("No profile found: %s\n", mds->mds_profile);
GOTO(err_cleanup, rc = -ENOENT);
}
- rc = mds_lov_connect(obd, lprof->lp_osc);
+ rc = mds_lov_connect(obd, lprof->lp_lov);
if (rc)
GOTO(err_cleanup, rc);
group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
valsize = sizeof(group);
- rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
+ rc = obd_set_info(mds->mds_lov_exp, strlen("mds_conn"), "mds_conn",
valsize, &group);
if (rc)
GOTO(out, rc);
if (mds->mds_profile) {
char * cln_prof;
- struct config_llog_instance cfg;
+ struct llog_ctxt *llctx;
struct lvfs_run_ctxt saved;
+ struct config_llog_instance cfg;
int len = strlen(mds->mds_profile) + sizeof("-clean") + 1;
OBD_ALLOC(cln_prof, len);
cfg.cfg_uuid = mds->mds_lov_uuid;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- class_config_process_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
- cln_prof, &cfg);
+ llctx = llog_get_context(&obd->obd_llogs,
+ LLOG_CONFIG_ORIG_CTXT);
+ class_config_process_llog(llctx, cln_prof, &cfg);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
OBD_FREE(cln_prof, len);
if (mds->mds_sb == NULL)
RETURN(0);
+ mds_update_last_fid(obd, 1);
mds_update_server_data(obd, 1);
+
if (mds->mds_lov_objids != NULL) {
- OBD_FREE(mds->mds_lov_objids,
- mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
+ int size = mds->mds_lov_desc.ld_tgt_count *
+ sizeof(obd_id);
+ OBD_FREE(mds->mds_lov_objids, size);
}
mds_fs_cleanup(obd, flags);
unlock_kernel();
- /* 2 seems normal on mds, (may_umount() also expects 2
- fwiw), but we only see 1 at this point in obdfilter. */
+ /*
+ * 2 seems normal on mds, (may_umount() also expects 2 fwiw), but we
+ * only see 1 at this point in obdfilter.
+ */
if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
CERROR("%s: mount busy, mnt_count %d != 2\n", obd->obd_name,
atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
mntput(mds->mds_vfsmnt);
-
mds->mds_sb = 0;
ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
if (rc)
RETURN(req->rq_status = rc);
- rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
- intent_set_disposition(rep, DISP_IT_EXECD);
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+ LASSERT(rep != NULL);
+ intent_set_disposition(rep, DISP_IT_EXECD);
/* execute policy */
switch ((long)it->opc) {
LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
it->opc, lockh[0].cookie);
-
/* If we've already given this lock to a client once, then we should
* have no readers or writers. Otherwise, we should have one reader
* _or_ writer ref (which will be zeroed below) before returning the
RETURN(0);
}
-static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
- void *data)
+static struct dentry *mds_lvfs_id2dentry(__u64 ino, __u32 gen,
+ __u64 gr, void *data)
{
+ struct lustre_id id;
struct obd_device *obd = data;
- struct ll_fid fid;
- fid.id = id;
- fid.generation = gen;
- return mds_fid2dentry(&obd->u.mds, &fid, NULL);
+
+ id_ino(&id) = ino;
+ id_gen(&id) = gen;
+ return mds_id2dentry(obd, &id, NULL);
}
static int mds_get_info(struct obd_export *exp, __u32 keylen,
- void *key, __u32 *vallen, void *val)
+ void *key, __u32 *valsize, void *val)
{
struct obd_device *obd;
struct mds_obd *mds;
ENTRY;
obd = class_exp2obd(exp);
+ mds = &obd->u.mds;
+
if (obd == NULL) {
CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
exp->exp_handle.h_cookie);
}
if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
- /*Get log_context handle*/
+ /* get log_context handle. */
unsigned long *llh_handle = val;
- *vallen = sizeof(unsigned long);
+ *valsize = sizeof(unsigned long);
*llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
RETURN(0);
}
if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
- /*Get log_context handle*/
+ /* get log_context handle. */
unsigned long *sb = val;
- *vallen = sizeof(unsigned long);
+ *valsize = sizeof(unsigned long);
*sb = (unsigned long)obd->u.mds.mds_sb;
RETURN(0);
}
- mds = &obd->u.mds;
if (keylen >= strlen("mdsize") && memcmp(key, "mdsize", keylen) == 0) {
__u32 *mdsize = val;
- *vallen = sizeof(*mdsize);
+ *valsize = sizeof(*mdsize);
*mdsize = mds->mds_max_mdsize;
RETURN(0);
}
+ if (keylen >= strlen("mdsnum") && strcmp(key, "mdsnum") == 0) {
+ __u32 *mdsnum = val;
+ *valsize = sizeof(*mdsnum);
+ *mdsnum = mds->mds_num;
+ RETURN(0);
+ }
+
+ if (keylen >= strlen("rootid") && strcmp(key, "rootid") == 0) {
+ struct lustre_id *rootid = val;
+ *valsize = sizeof(struct lustre_id);
+ *rootid = mds->mds_rootid;
+ RETURN(0);
+ }
+
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
struct lvfs_callback_ops mds_lvfs_ops = {
- l_fid2dentry: mds_lvfs_fid2dentry,
+ l_id2dentry: mds_lvfs_id2dentry,
};
int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
int niocount, struct niobuf_remote *nb,
struct niobuf_local *res,
struct obd_trans_info *oti);
+
int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj, int niocount,
struct niobuf_local *res, struct obd_trans_info *oti,
.o_detach = mdt_detach,
.o_setup = mdt_setup,
.o_cleanup = mdt_cleanup,
- .o_attach = mdt_attach,
- .o_detach = mdt_detach,
};
static int __init mds_init(void)