#include <linux/random.h>
#include <linux/fs.h>
#include <linux/jbd.h>
-#include <linux/ext3_fs.h>
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-# include <linux/smp_lock.h>
-# include <linux/buffer_head.h>
-# include <linux/workqueue.h>
-# include <linux/mount.h>
-#else
-# include <linux/locks.h>
-#endif
+#include <linux/smp_lock.h>
+#include <linux/buffer_head.h>
+#include <linux/workqueue.h>
+#include <linux/mount.h>
+#include <linux/lustre_acl.h>
#include <obd_class.h>
#include <lustre_dlm.h>
#include <obd_lov.h>
CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
"number of MDS service threads to start");
+__u32 mds_max_ost_index=0xFFFF;
+CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
+ "maximal OST index");
+
static int mds_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
ldlm_mode_t mode, int flags, void *data);
for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
- pages[i] = alloc_pages(GFP_KERNEL, 0);
+ OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
if (pages[i] == NULL)
GOTO(cleanup_buf, rc = -ENOMEM);
tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
tmpsize, offset, file->f_dentry->d_inode->i_ino,
- file->f_dentry->d_inode->i_size);
+ i_size_read(file->f_dentry->d_inode));
rc = fsfilt_readpage(req->rq_export->exp_obd, file,
kmap(pages[i]), tmpsize, &offset);
rc = -ETIMEDOUT; /* XXX should this be a different errno? */
}
- DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
+ DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
(rc == -ETIMEDOUT) ? "timeout" : "network error",
desc->bd_nob_transferred, count,
req->rq_export->exp_client_uuid.uuid,
cleanup_buf:
for (i = 0; i < npages; i++)
if (pages[i])
- __free_pages(pages[i], 0);
+ OBD_PAGE_FREE(pages[i]);
ptlrpc_free_bulk(desc);
out_free:
struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
struct vfsmount **mnt, int lock_mode,
struct lustre_handle *lockh,
- char *name, int namelen, __u64 lockpart)
+ __u64 lockpart)
{
struct mds_obd *mds = &obd->u.mds;
struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
struct ldlm_res_id res_id = { .name = {0} };
int flags = LDLM_FL_ATOMIC_CB, rc;
- ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
+ ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
ENTRY;
if (IS_ERR(de))
res_id.name[0] = de->d_inode->i_ino;
res_id.name[1] = de->d_inode->i_generation;
- rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id,
- LDLM_IBITS, &policy, lock_mode, &flags,
+ rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
+ LDLM_IBITS, &policy, lock_mode, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
NULL, NULL, 0, NULL, lockh);
if (rc != ELDLM_OK) {
RETURN(result);
}
-static int mds_connect_internal(struct obd_export *exp,
+static int mds_connect_internal(struct obd_export *exp,
struct obd_connect_data *data)
{
struct obd_device *obd = exp->exp_obd;
if (data != NULL) {
- data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
+ data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
data->ocd_ibits_known &= MDS_INODELOCK_FULL;
/* If no known bits (which should not happen, probably,
return 0;
}
-static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int mds_reconnect(const struct lu_env *env,
+ struct obd_export *exp, struct obd_device *obd,
struct obd_uuid *cluuid,
struct obd_connect_data *data)
{
* about that client, like open files, the last operation number it did
* on the server, etc.
*/
-static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid, struct obd_connect_data *data)
+static int mds_connect(const struct lu_env *env,
+ struct lustre_handle *conn, struct obd_device *obd,
+ struct obd_uuid *cluuid, struct obd_connect_data *data,
+ void *localdata)
{
struct obd_export *exp;
struct mds_export_data *med;
struct mds_client_data *mcd = NULL;
- int rc, abort_recovery;
+ int rc;
ENTRY;
if (!conn || !obd || !cluuid)
RETURN(-EINVAL);
- /* Check for aborted recovery. */
- spin_lock_bh(&obd->obd_processing_task_lock);
- abort_recovery = obd->obd_abort_recovery;
- spin_unlock_bh(&obd->obd_processing_task_lock);
- if (abort_recovery)
- target_abort_recovery(obd);
-
/* XXX There is a small race between checking the list and adding a
* new connection for the same UUID, but the real threat (list
* corruption when multiple different clients connect) is solved.
LASSERT(exp);
med = &exp->exp_mds_data;
+ exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
+
rc = mds_connect_internal(exp, data);
if (rc)
GOTO(out, rc);
memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
med->med_mcd = mcd;
- rc = mds_client_add(obd, exp, -1);
+ rc = mds_client_add(obd, exp, -1, localdata);
GOTO(out, rc);
out:
{
struct mds_export_data *med = &exp->exp_mds_data;
- INIT_LIST_HEAD(&med->med_open_head);
+ CFS_INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
+
+ spin_lock(&exp->exp_lock);
exp->exp_connecting = 1;
+ spin_unlock(&exp->exp_lock);
+
RETURN(0);
}
CWARN("%s: allocation failure during cleanup; can not force "
"close file handles on this service.\n", obd->obd_name);
OBD_FREE(lmm, mds->mds_max_mdsize);
- GOTO(out, rc = -ENOMEM);
+ GOTO(out_lmm, rc = -ENOMEM);
}
spin_lock(&med->med_open_lock);
mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
mfd->mfd_dentry->d_inode->i_ino);
- rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1);
+ rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1);
if (rc < 0)
CWARN("mds_get_md failure, rc=%d\n", rc);
else
/* child orphan sem protects orphan_dec_test and
* is_orphan race, mds_mfd_close drops it */
MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
-
rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
!(export->exp_flags & OBD_OPT_FAILOVER),
lmm, lmm_size, logcookies,
spin_lock(&med->med_open_lock);
}
+ spin_unlock(&med->med_open_lock);
OBD_FREE(logcookies, mds->mds_max_cookiesize);
+out_lmm:
OBD_FREE(lmm, mds->mds_max_mdsize);
-
- spin_unlock(&med->med_open_lock);
-
+out:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
mds_client_free(export);
-
- out:
RETURN(rc);
}
int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ENTRY;
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+ RETURN(req->rq_status = -ENOMEM);
rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
- CERROR("mds: out of memory for message\n");
- req->rq_status = -ENOMEM; /* superfluous? */
- RETURN(-ENOMEM);
- }
+ if (rc)
+ RETURN(req->rq_status = rc);
body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
/* get the LOV EA from @inode and store it into @md. It can be at most
* @size bytes, and @size is updated with the actual EA size.
- * The EA size is also returned on success, and -ve errno on failure.
+ * The EA size is also returned on success, and -ve errno on failure.
* If there is no EA then 0 is returned. */
int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
int *size, int lock)
if (!inode->i_op || !inode->i_op->getxattr)
GOTO(out, 0);
- lock_24kernel();
rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
lustre_msg_buf(repmsg, repoff, buflen),
buflen);
- unlock_24kernel();
if (rc >= 0)
repbody->aclsize = rc;
LASSERT(offset == REQ_REC_OFF); /* non-intent */
body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
- LASSERT(body != NULL); /* checked by caller */
- LASSERT_REQSWABBED(req, offset); /* swabbed by caller */
+ LASSERT(body != NULL); /* checked by caller */
+ LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */
if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
(S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
}
bufcount++;
} else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
- if (inode->i_size + 1 != body->eadatasize)
+ if (i_size_read(inode) + 1 != body->eadatasize)
CERROR("symlink size: %Lu, reply space: %d\n",
- inode->i_size + 1, body->eadatasize);
- size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
+ i_size_read(inode) + 1, body->eadatasize);
+ size[bufcount] = min_t(int, i_size_read(inode) + 1,
+ body->eadatasize);
bufcount++;
CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
- inode->i_size + 1, body->eadatasize);
+ i_size_read(inode) + 1, body->eadatasize);
}
#ifdef CONFIG_FS_POSIX_ACL
size[bufcount] = 0;
if (inode->i_op && inode->i_op->getxattr) {
- lock_24kernel();
rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
NULL, 0);
- unlock_24kernel();
if (rc < 0) {
if (rc != -ENODATA) {
rc = lustre_pack_reply(req, bufcount, size, NULL);
if (rc) {
- CERROR("lustre_pack_reply failed: rc %d\n", rc);
req->rq_status = rc;
RETURN(rc);
}
struct lvfs_run_ctxt saved;
struct mds_body *body;
struct dentry *dparent = NULL, *dchild = NULL;
- struct lvfs_ucred uc = {NULL,};
+ struct lvfs_ucred uc = {0,};
struct lustre_handle parent_lockh;
int namesize;
int rc = 0, cleanup_phase = 0, resent_req = 0;
RETURN(-EFAULT);
}
- LASSERT_REQSWAB(req, offset + 1);
+ lustre_set_req_swabbed(req, offset + 1);
name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
if (name == NULL) {
CERROR("Can't unpack name\n");
if (resent_req == 0) {
if (name) {
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
- rc = mds_get_parent_child_locked(obd, &obd->u.mds,
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
+ rc = mds_get_parent_child_locked(obd, &obd->u.mds,
&body->fid1,
- &parent_lockh,
+ &parent_lockh,
&dparent, LCK_CR,
MDS_INODELOCK_UPDATE,
name, namesize,
/* For revalidate by fid we always take UPDATE lock */
dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
LCK_CR, child_lockh,
- NULL, 0, child_part);
+ child_part);
LASSERT(dchild);
if (IS_ERR(dchild))
rc = PTR_ERR(dchild);
- }
+ }
if (rc)
GOTO(cleanup, rc);
} else {
default:
mds_exit_ucred(&uc, mds);
if (req->rq_reply_state == NULL) {
+ int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+ if (rc == 0)
+ rc = rc2;
req->rq_status = rc;
- lustre_pack_reply(req, 1, NULL, NULL);
}
}
return rc;
struct lvfs_run_ctxt saved;
struct dentry *de;
struct mds_body *body;
- struct lvfs_ucred uc = { NULL, };
+ struct lvfs_ucred uc = {0,};
int rc = 0;
ENTRY;
GOTO(out_pop, rc);
}
- req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
+ req->rq_status = mds_getattr_internal(obd, de, req, body,
+ REPLY_REC_OFF);
l_dput(de);
GOTO(out_pop, rc);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
out_ucred:
if (req->rq_reply_state == NULL) {
+ int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+ if (rc == 0)
+ rc = rc2;
req->rq_status = rc;
- lustre_pack_reply(req, 1, NULL, NULL);
}
mds_exit_ucred(&uc, mds);
return rc;
}
static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- __u64 max_age)
+ __u64 max_age, __u32 flags)
{
int rc;
(MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
OBD_COUNTER_INCREMENT(obd, statfs);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
+ GOTO(out, rc = -ENOMEM);
rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
- CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
+ if (rc)
GOTO(out, rc);
- }
/* We call this so that we can cache a bit - 1 jiffie worth */
rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
size[REPLY_REC_OFF]),
- cfs_time_current_64() - HZ);
+ cfs_time_current_64() - HZ, 0);
if (rc) {
CERROR("mds_obd_statfs failed: rc %d\n", rc);
GOTO(out, rc);
if (body == NULL)
GOTO(out, rc = -EFAULT);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
+ GOTO(out, rc = -ENOMEM);
rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
- CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
+ if (rc)
GOTO(out, rc);
- }
- if (body->fid1.id == 0) {
- /* a fid of zero is taken to mean "sync whole filesystem" */
- rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
- GOTO(out, rc);
- } else {
+ rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
+ if (rc == 0 && body->fid1.id != 0) {
struct dentry *de;
de = mds_fid2dentry(mds, &body->fid1, NULL);
if (IS_ERR(de))
GOTO(out, rc = PTR_ERR(de));
- /* The file parameter isn't used for anything */
- if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
- rc = de->d_inode->i_fop->fsync(NULL, de, 1);
- if (rc == 0) {
- body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*body));
- mds_pack_inode2fid(&body->fid1, de->d_inode);
- mds_pack_inode2body(body, de->d_inode);
- }
+ body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+ sizeof(*body));
+ mds_pack_inode2fid(&body->fid1, de->d_inode);
+ mds_pack_inode2body(body, de->d_inode);
l_dput(de);
- GOTO(out, rc);
}
+ GOTO(out, rc);
out:
req->rq_status = rc;
return 0;
struct mds_body *body, *repbody;
struct lvfs_run_ctxt saved;
int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
- struct lvfs_ucred uc = {NULL,};
+ struct lvfs_ucred uc = {0,};
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
RETURN(-ENOMEM);
-
rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc) {
- CERROR("error packing readpage reply: rc %d\n", rc);
+ if (rc)
GOTO(out, rc);
- }
body = lustre_swab_reqbuf(req, offset, sizeof(*body),
lustre_swab_mds_body);
GOTO(out_pop, rc = PTR_ERR(file));
/* body->size is actually the offset -eeb */
- if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
+ if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) {
CERROR("offset "LPU64" not on a block boundary of %lu\n",
- body->size, de->d_inode->i_blksize);
+ body->size, de->d_inode->i_sb->s_blocksize);
GOTO(out_file, rc = -EFAULT);
}
/* body->nlink is actually the #bytes to read -eeb */
- if (body->nlink & (de->d_inode->i_blksize - 1)) {
+ if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) {
CERROR("size %u is not multiple of blocksize %lu\n",
- body->nlink, de->d_inode->i_blksize);
+ body->nlink, de->d_inode->i_sb->s_blocksize);
GOTO(out_file, rc = -EFAULT);
}
repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
sizeof(*repbody));
- repbody->size = file->f_dentry->d_inode->i_size;
+ repbody->size = i_size_read(file->f_dentry->d_inode);
repbody->valid = OBD_MD_FLSIZE;
/* to make this asynchronous make sure that the handling function
return rc;
}
-static int mds_filter_recovery_request(struct ptlrpc_request *req,
- struct obd_device *obd, int *process)
+int mds_filter_recovery_request(struct ptlrpc_request *req,
+ struct obd_device *obd, int *process)
{
switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case MDS_CONNECT: /* This will never get here, but for completeness. */
RETURN(0);
case MDS_CLOSE:
+ case MDS_DONE_WRITING:
case MDS_SYNC: /* used in unmounting */
case OBD_PING:
case MDS_REINT:
+ case SEQ_QUERY:
+ case FLD_QUERY:
case LDLM_ENQUEUE:
*process = target_queue_recovery_request(req, obd);
RETURN(0);
default:
DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
- *process = 0;
- /* XXX what should we set rq_status to here? */
- req->rq_status = -EAGAIN;
- RETURN(ptlrpc_error(req));
+ *process = -EAGAIN;
+ RETURN(0);
}
}
+EXPORT_SYMBOL(mds_filter_recovery_request);
static char *reint_names[] = {
[REINT_SETATTR] "setattr",
static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
{
- char *key;
- __u32 *val;
- int keylen, rc = 0;
+ void *key, *val;
+ int keylen, vallen, rc = 0;
ENTRY;
key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
}
keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
- val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*val));
- if (val == NULL) {
- DEBUG_REQ(D_HA, req, "no set_info val");
- RETURN(-EFAULT);
- }
+ val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
+ vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
rc = lustre_pack_reply(req, 1, NULL, NULL);
if (rc)
RETURN(rc);
+
lustre_msg_set_status(req->rq_repmsg, 0);
- if (keylen < strlen("read-only") ||
- memcmp(key, "read-only", keylen) != 0)
- RETURN(-EINVAL);
+ if (KEY_IS("read-only")) {
+ if (val == NULL || vallen < sizeof(__u32)) {
+ DEBUG_REQ(D_HA, req, "no set_info val");
+ RETURN(-EFAULT);
+ }
- if (*val)
- exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
- else
- exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+ if (*(__u32 *)val)
+ exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
+ else
+ exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+ } else {
+ RETURN(-EINVAL);
+ }
RETURN(0);
}
RETURN(-EPROTO);
rc = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc) {
- CERROR("mds: out of memory while packing quotacheck reply\n");
+ if (rc)
RETURN(rc);
- }
req->rq_status = obd_quotacheck(req->rq_export, oqctl);
RETURN(0);
RETURN(0);
}
-static int mds_msg_check_version(struct lustre_msg *msg)
+int mds_msg_check_version(struct lustre_msg *msg)
{
int rc;
case MDS_CONNECT:
case MDS_DISCONNECT:
case OBD_PING:
+ case SEC_CTX_INIT:
+ case SEC_CTX_INIT_CONT:
+ case SEC_CTX_FINI:
rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
if (rc)
CERROR("bad opc %u version %08x, expecting %08x\n",
case MDS_GETATTR_NAME:
case MDS_STATFS:
case MDS_READPAGE:
+ case MDS_WRITEPAGE:
+ case MDS_IS_SUBDIR:
case MDS_REINT:
case MDS_CLOSE:
case MDS_DONE_WRITING:
case MDS_QUOTACTL:
case QUOTA_DQACQ:
case QUOTA_DQREL:
+ case SEQ_QUERY:
+ case FLD_QUERY:
rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
if (rc)
CERROR("bad opc %u version %08x, expecting %08x\n",
}
return rc;
}
+EXPORT_SYMBOL(mds_msg_check_version);
int mds_handle(struct ptlrpc_request *req)
{
int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
- int rc = 0;
+ int rc;
struct mds_obd *mds = NULL; /* quell gcc overwarning */
struct obd_device *obd = NULL;
ENTRY;
- OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+ if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE))
+ RETURN(0);
LASSERT(current->journal_info == NULL);
/* XXX identical to OST */
if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
struct mds_export_data *med;
- int recovering, abort_recovery;
+ int recovering;
if (req->rq_export == NULL) {
CERROR("operation %d on unconnected MDS from %s\n",
med = &req->rq_export->exp_mds_data;
obd = req->rq_export->exp_obd;
- mds = &obd->u.mds;
+ mds = mds_req2mds(req);
/* sanity check: if the xid matches, the request must
* be marked as a resent or replayed */
/* Check for aborted recovery. */
spin_lock_bh(&obd->obd_processing_task_lock);
- abort_recovery = obd->obd_abort_recovery;
recovering = obd->obd_recovering;
spin_unlock_bh(&obd->obd_processing_task_lock);
- if (abort_recovery) {
- target_abort_recovery(obd);
- } else if (recovering) {
+ if (recovering) {
rc = mds_filter_recovery_request(req, obd,
&should_process);
if (rc || !should_process)
RETURN(rc);
+ else if (should_process < 0) {
+ req->rq_status = should_process;
+ rc = ptlrpc_error(req);
+ RETURN(rc);
+ }
}
}
switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case MDS_CONNECT:
DEBUG_REQ(D_INODE, req, "connect");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
- rc = target_handle_connect(req, mds_handle);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET))
+ RETURN(0);
+ rc = target_handle_connect(req);
if (!rc) {
/* Now that we have an export, set mds. */
+ /*
+ * XXX nikita: these assignments are useless: mds is
+ * never used below, and obd is only used for
+ * MSG_LAST_REPLAY case, which never happens for
+ * MDS_CONNECT.
+ */
obd = req->rq_export->exp_obd;
mds = mds_req2mds(req);
}
case MDS_DISCONNECT:
DEBUG_REQ(D_INODE, req, "disconnect");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET))
+ RETURN(0);
rc = target_handle_disconnect(req);
req->rq_status = rc; /* superfluous? */
break;
case MDS_GETSTATUS:
DEBUG_REQ(D_INODE, req, "getstatus");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET))
+ RETURN(0);
rc = mds_getstatus(req);
break;
case MDS_GETATTR:
DEBUG_REQ(D_INODE, req, "getattr");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET))
+ RETURN(0);
rc = mds_getattr(req, REQ_REC_OFF);
break;
case MDS_SETXATTR:
DEBUG_REQ(D_INODE, req, "setxattr");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET))
+ RETURN(0);
rc = mds_setxattr(req);
break;
case MDS_GETXATTR:
DEBUG_REQ(D_INODE, req, "getxattr");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET))
+ RETURN(0);
rc = mds_getxattr(req);
break;
case MDS_GETATTR_NAME: {
struct lustre_handle lockh = { 0 };
DEBUG_REQ(D_INODE, req, "getattr_name");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET))
+ RETURN(0);
/* If this request gets a reconstructed reply, we won't be
* acquiring any new locks in mds_getattr_lock, so we don't
}
case MDS_STATFS:
DEBUG_REQ(D_INODE, req, "statfs");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET))
+ RETURN(0);
rc = mds_statfs(req);
break;
case MDS_READPAGE:
DEBUG_REQ(D_INODE, req, "readpage");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET))
+ RETURN(0);
rc = mds_readpage(req, REQ_REC_OFF);
- if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
RETURN(0);
}
__u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
sizeof(*opcp));
__u32 opc;
+ int op = 0;
int size[4] = { sizeof(struct ptlrpc_body),
sizeof(struct mds_body),
mds->mds_max_mdsize,
(opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
reint_names[opc] == NULL) ? reint_names[opc] :
"unknown opcode");
+ switch (opc) {
+ case REINT_CREATE:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE;
+ break;
+ case REINT_LINK:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_LINK;
+ break;
+ case REINT_OPEN:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN;
+ break;
+ case REINT_SETATTR:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR;
+ break;
+ case REINT_RENAME:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME;
+ break;
+ case REINT_UNLINK:
+ op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK;
+ break;
+ default:
+ op = 0;
+ break;
+ }
+
+ if (op && req->rq_rqbd->rqbd_service->srv_stats)
+ lprocfs_counter_incr(
+ req->rq_rqbd->rqbd_service->srv_stats, op);
- OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET))
+ RETURN(0);
if (opc == REINT_UNLINK || opc == REINT_RENAME)
bufcount = 4;
case MDS_CLOSE:
DEBUG_REQ(D_INODE, req, "close");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET))
+ RETURN(0);
rc = mds_close(req, REQ_REC_OFF);
+ fail = OBD_FAIL_MDS_CLOSE_NET_REP;
break;
case MDS_DONE_WRITING:
DEBUG_REQ(D_INODE, req, "done_writing");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET))
+ RETURN(0);
rc = mds_done_writing(req, REQ_REC_OFF);
break;
case MDS_PIN:
DEBUG_REQ(D_INODE, req, "pin");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET))
+ RETURN(0);
rc = mds_pin(req, REQ_REC_OFF);
break;
case MDS_SYNC:
DEBUG_REQ(D_INODE, req, "sync");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET))
+ RETURN(0);
rc = mds_sync(req, REQ_REC_OFF);
break;
case MDS_QUOTACHECK:
DEBUG_REQ(D_INODE, req, "quotacheck");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET))
+ RETURN(0);
rc = mds_handle_quotacheck(req);
break;
case MDS_QUOTACTL:
DEBUG_REQ(D_INODE, req, "quotactl");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET))
+ RETURN(0);
rc = mds_handle_quotactl(req);
break;
case OBD_LOG_CANCEL:
CDEBUG(D_INODE, "log cancel\n");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
+ RETURN(0);
rc = -ENOTSUPP; /* la la la */
break;
case LDLM_ENQUEUE:
DEBUG_REQ(D_INODE, req, "enqueue");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
+ RETURN(0);
rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
ldlm_server_blocking_ast, NULL);
fail = OBD_FAIL_LDLM_REPLY;
break;
case LDLM_CONVERT:
DEBUG_REQ(D_INODE, req, "convert");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
+ RETURN(0);
rc = ldlm_handle_convert(req);
break;
case LDLM_BL_CALLBACK:
DEBUG_REQ(D_INODE, req, "callback");
CERROR("callbacks should not happen on MDS\n");
LBUG();
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
+ RETURN(0);
break;
case LLOG_ORIGIN_HANDLE_CREATE:
DEBUG_REQ(D_INODE, req, "llog_init");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_create(req);
break;
case LLOG_ORIGIN_HANDLE_DESTROY:
DEBUG_REQ(D_INODE, req, "llog_init");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_destroy(req);
break;
case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
DEBUG_REQ(D_INODE, req, "llog next block");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_next_block(req);
break;
case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
DEBUG_REQ(D_INODE, req, "llog prev block");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_prev_block(req);
break;
case LLOG_ORIGIN_HANDLE_READ_HEADER:
DEBUG_REQ(D_INODE, req, "llog read header");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_read_header(req);
break;
case LLOG_ORIGIN_HANDLE_CLOSE:
DEBUG_REQ(D_INODE, req, "llog close");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_origin_handle_close(req);
break;
case LLOG_CATINFO:
DEBUG_REQ(D_INODE, req, "llog catinfo");
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+ RETURN(0);
rc = llog_catinfo(req);
break;
default:
/* If we're DISCONNECTing, the mds_export_data is already freed */
if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
struct mds_export_data *med = &req->rq_export->exp_mds_data;
-
+
/* I don't think last_xid is used for anyway, so I'm not sure
if we need to care about last_close_xid here.*/
lustre_msg_set_last_xid(req->rq_repmsg,
EXIT;
out:
- if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
- if (obd && obd->obd_recovering) {
- DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
- return target_queue_final_reply(req, rc);
- }
- /* Lost a race with recovery; let the error path DTRT. */
- rc = req->rq_status = -ENOTCONN;
- }
-
target_send_reply(req, rc, fail);
return 0;
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
if (rc)
CERROR("error writing MDS server data: rc = %d\n", rc);
-
RETURN(rc);
}
options = ++p;
}
}
+static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
+{
+ int rc = 0;
+ ENTRY;
+
+ if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
+ class_uuid_t uuid;
+
+ ll_generate_random_uuid(uuid);
+ class_uuid_unparse(uuid, &mds->mds_lov_uuid);
+
+ OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
+ if (mds->mds_profile == NULL)
+ RETURN(-ENOMEM);
+
+ strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
+ LUSTRE_CFG_BUFLEN(lcfg, 3));
+ }
+ RETURN(rc);
+}
/* mount the file system (secretly). lustre_cfg parameters are:
* 1 = device
* 3 = config name
* 4 = mount options
*/
-static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
{
struct lprocfs_static_vars lvars;
- struct lustre_cfg* lcfg = buf;
struct mds_obd *mds = &obd->u.mds;
- struct lustre_sb_info *lsi;
struct lustre_mount_info *lmi;
struct vfsmount *mnt;
+ struct lustre_sb_info *lsi;
struct obd_uuid uuid;
__u8 *uuid_ptr;
char *str, *label;
/* We mounted in lustre_fill_super.
lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
+
lsi = s2lsi(lmi->lmi_sb);
fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
mds->mds_max_mdsize = sizeof(struct lov_mds_md);
mds->mds_max_cookiesize = sizeof(struct llog_cookie);
mds->mds_atime_diff = MAX_ATIME_DIFF;
+ mds->mds_evict_ost_nids = 1;
sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
- obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+ obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+ LDLM_NAMESPACE_GREEDY);
if (obd->obd_namespace == NULL) {
mds_cleanup(obd);
GOTO(err_ops, rc = -ENOMEM);
}
ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
+ lprocfs_mds_init_vars(&lvars);
+ if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
+ lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
+ /* Init private stats here */
+ mds_stats_counter_init(obd->obd_stats);
+ obd->obd_proc_exports_entry = lprocfs_register("exports",
+ obd->obd_proc_entry,
+ NULL, NULL);
+ if (IS_ERR(obd->obd_proc_exports_entry)) {
+ rc = PTR_ERR(obd->obd_proc_exports_entry);
+ CERROR("error %d setting up lprocfs for %s\n",
+ rc, "exports");
+ obd->obd_proc_exports_entry = NULL;
+ }
+ }
+
rc = mds_fs_setup(obd, mnt);
if (rc) {
CERROR("%s: MDS filesystem method init failed: rc = %d\n",
GOTO(err_ns, rc);
}
- rc = llog_start_commit_thread();
+ if (obd->obd_proc_exports_entry)
+ lprocfs_add_simple(obd->obd_proc_exports_entry,
+ "clear", lprocfs_nid_stats_clear_read,
+ lprocfs_nid_stats_clear_write, obd);
+
+ rc = mds_lov_presetup(mds, lcfg);
if (rc < 0)
GOTO(err_fs, rc);
- if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
- class_uuid_t uuid;
-
- ll_generate_random_uuid(uuid);
- class_uuid_unparse(uuid, &mds->mds_lov_uuid);
-
- OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
- if (mds->mds_profile == NULL)
- GOTO(err_fs, rc = -ENOMEM);
-
- strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
- LUSTRE_CFG_BUFLEN(lcfg, 3));
- }
-
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"mds_ldlm_client", &obd->obd_ldlm_client);
obd->obd_replayable = 1;
if (rc)
GOTO(err_fs, rc);
+#if 0
mds->mds_group_hash = upcall_cache_init(obd->obd_name);
if (IS_ERR(mds->mds_group_hash)) {
rc = PTR_ERR(mds->mds_group_hash);
mds->mds_group_hash = NULL;
GOTO(err_qctxt, rc);
}
+#endif
/* Don't wait for mds_postrecov trying to clear orphans */
obd->obd_async_recov = 1;
if (rc)
GOTO(err_qctxt, rc);
- lprocfs_init_vars(mds, &lvars);
- if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
- lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
- /* Init private stats here */
- mds_stats_counter_init(obd->obd_stats);
- obd->obd_proc_exports = proc_mkdir("exports",
- obd->obd_proc_entry);
- }
-
uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
if (uuid_ptr != NULL) {
class_uuid_unparse(uuid_ptr, &uuid);
"/proc/fs/lustre/mds/%s/recovery_status.\n",
obd->obd_name, lustre_cfg_string(lcfg, 1),
label ?: "", label ? "/" : "", str,
- obd->obd_recoverable_clients,
- (obd->obd_recoverable_clients == 1) ?
+ obd->obd_max_recoverable_clients,
+ (obd->obd_max_recoverable_clients == 1) ?
"client" : "clients",
(int)(OBD_RECOVERY_TIMEOUT) / 60,
(int)(OBD_RECOVERY_TIMEOUT) % 60,
err_fs:
/* No extra cleanup needed for llog_init_commit_thread() */
mds_fs_cleanup(obd);
+#if 0
upcall_cache_cleanup(mds->mds_group_hash);
mds->mds_group_hash = NULL;
+#endif
err_ns:
+ lprocfs_free_obd_stats(obd);
+ lprocfs_obd_cleanup(obd);
ldlm_namespace_free(obd->obd_namespace, 0);
obd->obd_namespace = NULL;
err_ops:
int rc = 0;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
if (mds->mds_profile) {
struct lustre_profile *lprof;
- /* The profile defines which osc and mdc to connect to, for a
+ /* The profile defines which osc and mdc to connect to, for a
client. We reuse that here to figure out the name of the
- lov to use (and ignore lprof->lp_mdc).
- The profile was set in the config log with
+ lov to use (and ignore lprof->lp_md).
+ The profile was set in the config log with
LCFG_MOUNTOPT profilenm oscnm mdcnm */
lprof = class_get_profile(mds->mds_profile);
if (lprof == NULL) {
CERROR("No profile found: %s\n", mds->mds_profile);
GOTO(err_cleanup, rc = -ENOENT);
}
- rc = mds_lov_connect(obd, lprof->lp_osc);
+ rc = mds_lov_connect(obd, lprof->lp_dt);
if (rc)
GOTO(err_cleanup, rc);
}
int mds_postrecov(struct obd_device *obd)
{
- int rc;
+ int rc = 0;
ENTRY;
if (obd->obd_fail)
RETURN(0);
LASSERT(!obd->obd_recovering);
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
-
- /* FIXME why not put this in the synchronize? */
- /* set nextid first, so we are sure it happens */
- rc = mds_lov_set_nextid(obd);
- if (rc) {
- CERROR("%s: mds_lov_set_nextid failed %d\n",
- obd->obd_name, rc);
- GOTO(out, rc);
- }
+ LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
/* clean PENDING dir */
- rc = mds_cleanup_pending(obd);
- if (rc < 0)
- GOTO(out, rc);
+ if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+ rc = mds_cleanup_pending(obd);
+ if (rc < 0)
+ GOTO(out, rc);
/* FIXME Does target_finish_recovery really need this to block? */
/* Notify the LOV, which will in turn call mds_notify for each tgt */
/* This means that we have to hack obd_notify to think we're obd_set_up
during mds_lov_connect. */
- obd_notify(obd->u.mds.mds_osc_obd, NULL,
+ obd_notify(obd->u.mds.mds_osc_obd, NULL,
obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
OBD_NOTIFY_SYNC, NULL);
case OBD_CLEANUP_EARLY:
break;
case OBD_CLEANUP_EXPORTS:
- target_cleanup_recovery(obd);
+ /*XXX Use this for mdd mds cleanup, so comment out
+ *this target_cleanup_recovery for this tmp MDD MDS
+ *Wangdi*/
+ if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+ target_cleanup_recovery(obd);
mds_lov_early_clean(obd);
break;
case OBD_CLEANUP_SELF_EXP:
we just need to drop our ref */
class_export_put(mds->mds_osc_exp);
- lprocfs_obd_cleanup(obd);
+ lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+ lprocfs_free_per_client_stats(obd);
lprocfs_free_obd_stats(obd);
+ lprocfs_obd_cleanup(obd);
lquota_cleanup(mds_quota_interface_ref, obd);
mds_update_server_data(obd, 1);
- if (mds->mds_lov_objids != NULL)
- OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+ /* XXX
+ mds_lov_destroy_objids(obd);
+ */
mds_fs_cleanup(obd);
+#if 0
upcall_cache_cleanup(mds->mds_group_hash);
mds->mds_group_hash = NULL;
+#endif
server_put_mount(obd->obd_name, mds->mds_vfsmnt);
obd->u.obt.obt_sb = NULL;
struct obd_export *exp = req->rq_export;
struct ldlm_request *dlmreq =
lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
- struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+ struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
struct list_head *iter;
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
lockh->cookie = lock->l_handle.h_cookie;
LDLM_DEBUG(lock, "restoring lock cookie");
- DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+ DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64,
lockh->cookie);
if (old_lock)
*old_lock = LDLM_LOCK_GET(lock);
lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
- DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+ DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
remote_hdl.cookie);
}
if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
/* No intent was provided */
rc = lustre_pack_reply(req, 2, repsize, NULL);
- LASSERT(rc == 0);
+ if (rc)
+ RETURN(rc);
RETURN(0);
}
RETURN(ELDLM_LOCK_REPLACED);
}
-static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct mds_obd *mds = &obd->u.mds;
struct lprocfs_static_vars lvars;
int rc = 0;
ENTRY;
- lprocfs_init_vars(mdt, &lvars);
+ lprocfs_mdt_init_vars(&lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
sema_init(&mds->mds_health_sem, 1);
mds_max_threads = mds_min_threads = mds_num_threads;
} else {
/* Base min threads on memory and cpus */
- mds_min_threads = smp_num_cpus * num_physpages >>
+ mds_min_threads = num_possible_cpus() * num_physpages >>
(27 - CFS_PAGE_SHIFT);
if (mds_min_threads < MDS_THREADS_MIN)
mds_min_threads = MDS_THREADS_MIN;
/* Largest auto threads start value */
- if (mds_min_threads > 32)
+ if (mds_min_threads > 32)
mds_min_threads = 32;
mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
}
MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
mds_handle, LUSTRE_MDS_NAME,
- obd->obd_proc_entry, NULL,
- mds_min_threads, mds_max_threads, "ll_mdt");
+ obd->obd_proc_entry, NULL,
+ mds_min_threads, mds_max_threads, "ll_mdt", 0);
if (!mds->mds_service) {
CERROR("failed to start service\n");
mds_handle, "mds_setattr",
obd->obd_proc_entry, NULL,
mds_min_threads, mds_max_threads,
- "ll_mdt_attr");
+ "ll_mdt_attr", 0);
if (!mds->mds_setattr_service) {
CERROR("failed to start getattr service\n");
GOTO(err_thread, rc = -ENOMEM);
MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
mds_handle, "mds_readpage",
- obd->obd_proc_entry, NULL,
+ obd->obd_proc_entry, NULL,
MDS_THREADS_MIN_READPAGE, mds_max_threads,
- "ll_mdt_rdpg");
+ "ll_mdt_rdpg", 0);
if (!mds->mds_readpage_service) {
CERROR("failed to start readpage service\n");
GOTO(err_thread2, rc = -ENOMEM);
static int mds_health_check(struct obd_device *obd)
{
struct obd_device_target *odt = &obd->u.obt;
+#ifdef USE_HEALTH_CHECK_WRITE
struct mds_obd *mds = &obd->u.mds;
+#endif
int rc = 0;
if (odt->obt_sb->s_flags & MS_RDONLY)
rc = 1;
+#ifdef USE_HEALTH_CHECK_WRITE
LASSERT(mds->mds_health_check_filp != NULL);
rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
-
+#endif
return rc;
}
struct lprocfs_static_vars lvars;
int rc;
- lprocfs_init_vars(mds, &lvars);
-
+ lprocfs_mds_init_vars(&lvars);
+
rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
-
return(rc);
}
quota_interface_t *mds_quota_interface_ref;
extern quota_interface_t mds_quota_interface;
-static int __init mds_init(void)
+static __attribute__((unused)) int __init mds_init(void)
{
int rc;
struct lprocfs_static_vars lvars;
return rc;
}
init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
-
- lprocfs_init_vars(mds, &lvars);
- class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
- lprocfs_init_vars(mdt, &lvars);
- class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
+
+ lprocfs_mds_init_vars(&lvars);
+ class_register_type(&mds_obd_ops, NULL,
+ lvars.module_vars, LUSTRE_MDS_NAME, NULL);
+ lprocfs_mds_init_vars(&lvars);
+ mdt_obd_ops = mdt_obd_ops; //make compiler happy
+// class_register_type(&mdt_obd_ops, NULL,
+// lvars.module_vars, LUSTRE_MDT_NAME, NULL);
return 0;
}
-static void /*__exit*/ mds_exit(void)
+static __attribute__((unused)) void /*__exit*/ mds_exit(void)
{
lquota_exit(mds_quota_interface_ref);
if (mds_quota_interface_ref)
PORTAL_SYMBOL_PUT(mds_quota_interface);
class_unregister_type(LUSTRE_MDS_NAME);
- class_unregister_type(LUSTRE_MDT_NAME);
+// class_unregister_type(LUSTRE_MDT_NAME);
+}
+/*mds still need lov setup here*/
+static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct lvfs_run_ctxt saved;
+ const char *dev;
+ struct vfsmount *mnt;
+ struct lustre_sb_info *lsi;
+ struct lustre_mount_info *lmi;
+ struct dentry *dentry;
+ int rc = 0;
+ ENTRY;
+
+ CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
+ if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+ RETURN(0);
+
+ if (lcfg->lcfg_bufcount < 5) {
+ CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
+ RETURN(-EINVAL);
+ }
+ dev = lustre_cfg_string(lcfg, 4);
+ lmi = server_get_mount(dev);
+ LASSERT(lmi != NULL);
+
+ lsi = s2lsi(lmi->lmi_sb);
+ mnt = lmi->lmi_mnt;
+ /* FIXME: MDD LOV initialize objects.
+ * we need only lmi here but not get mount
+ * OSD did mount already, so put mount back
+ */
+ atomic_dec(&lsi->lsi_mounts);
+ mntput(mnt);
+
+ obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
+ mds_init_ctxt(obd, mnt);
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
+ if (IS_ERR(dentry)) {
+ rc = PTR_ERR(dentry);
+ CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
+ GOTO(err_putfs, rc);
+ }
+ mds->mds_objects_dir = dentry;
+
+ dentry = lookup_one_len("__iopen__", current->fs->pwd,
+ strlen("__iopen__"));
+ if (IS_ERR(dentry)) {
+ rc = PTR_ERR(dentry);
+ CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
+ GOTO(err_objects, rc);
+ }
+
+ mds->mds_fid_de = dentry;
+ if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
+ rc = -ENOENT;
+ CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
+ GOTO(err_fid, rc);
+ }
+ rc = mds_lov_init_objids(obd);
+ if (rc != 0) {
+ CERROR("cannot init lov objid rc = %d\n", rc);
+ GOTO(err_fid, rc );
+ }
+
+ rc = mds_lov_presetup(mds, lcfg);
+ if (rc < 0)
+ GOTO(err_objects, rc);
+
+ /* Don't wait for mds_postrecov trying to clear orphans */
+ obd->obd_async_recov = 1;
+ rc = mds_postsetup(obd);
+ /* Bug 11557 - allow async abort_recov start
+ FIXME can remove most of this obd_async_recov plumbing
+ obd->obd_async_recov = 0;
+ */
+
+ if (rc)
+ GOTO(err_objects, rc);
+
+ mds->mds_max_mdsize = sizeof(struct lov_mds_md);
+ mds->mds_max_cookiesize = sizeof(struct llog_cookie);
+
+err_pop:
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ RETURN(rc);
+err_fid:
+ dput(mds->mds_fid_de);
+err_objects:
+ dput(mds->mds_objects_dir);
+err_putfs:
+ fsfilt_put_ops(obd->obd_fsops);
+ goto err_pop;
+}
+
+static int mds_cmd_cleanup(struct obd_device *obd)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct lvfs_run_ctxt saved;
+ int rc = 0;
+ ENTRY;
+
+ if (obd->obd_fail)
+ LCONSOLE_WARN("%s: shutting down for failover; client state "
+ "will be preserved.\n", obd->obd_name);
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ mds_lov_destroy_objids(obd);
+
+ if (mds->mds_objects_dir != NULL) {
+ l_dput(mds->mds_objects_dir);
+ mds->mds_objects_dir = NULL;
+ }
+
+ shrink_dcache_parent(mds->mds_fid_de);
+ dput(mds->mds_fid_de);
+ LL_DQUOT_OFF(obd->u.obt.obt_sb);
+ fsfilt_put_ops(obd->obd_fsops);
+
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ RETURN(rc);
+}
+
+#if 0
+static int mds_cmd_health_check(struct obd_device *obd)
+{
+ return 0;
+}
+#endif
+static struct obd_ops mds_cmd_obd_ops = {
+ .o_owner = THIS_MODULE,
+ .o_setup = mds_cmd_setup,
+ .o_cleanup = mds_cmd_cleanup,
+ .o_precleanup = mds_precleanup,
+ .o_create = mds_obd_create,
+ .o_destroy = mds_obd_destroy,
+ .o_llog_init = mds_llog_init,
+ .o_llog_finish = mds_llog_finish,
+ .o_notify = mds_notify,
+ .o_postrecov = mds_postrecov,
+ // .o_health_check = mds_cmd_health_check,
+};
+
+static int __init mds_cmd_init(void)
+{
+ struct lprocfs_static_vars lvars;
+
+ lprocfs_mds_init_vars(&lvars);
+ class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
+ LUSTRE_MDS_NAME, NULL);
+
+ return 0;
+}
+
+static void /*__exit*/ mds_cmd_exit(void)
+{
+ class_unregister_type(LUSTRE_MDS_NAME);
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
MODULE_LICENSE("GPL");
-module_init(mds_init);
-module_exit(mds_exit);
+module_init(mds_cmd_init);
+module_exit(mds_cmd_exit);