X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=6d1a6e7eb2cbd931f5afa458bf6cf108acfa4b91;hb=d2d56f38da01;hp=3a9919435f3be4688c16803adb9aa724acf546d6;hpb=bf527ab7e56d4445f81223b23302b3cbf0dc5fb1;p=fs%2Flustre-release.git diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 3a99194..6d1a6e7 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -50,6 +50,7 @@ # include #endif +#include #include #include #include @@ -142,7 +143,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, rc = -ETIMEDOUT; /* XXX should this be a different errno? */ } - DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", + DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s", (rc == -ETIMEDOUT) ? "timeout" : "network error", desc->bd_nob_transferred, count, req->rq_export->exp_client_uuid.uuid, @@ -169,13 +170,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, - char *name, int namelen, __u64 lockpart) + __u64 lockpart) { struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; int flags = LDLM_FL_ATOMIC_CB, rc; - ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; ENTRY; if (IS_ERR(de)) @@ -183,8 +184,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, - LDLM_IBITS, &policy, lock_mode, &flags, + rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, + LDLM_IBITS, &policy, lock_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { @@ -254,12 +255,12 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, RETURN(result); } -static int mds_connect_internal(struct obd_export *exp, +static int mds_connect_internal(struct obd_export *exp, struct obd_connect_data *data) { struct obd_device *obd = exp->exp_obd; if (data != NULL) { - data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED; + data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; data->ocd_ibits_known &= MDS_INODELOCK_FULL; /* If no known bits (which should not happen, probably, @@ -310,25 +311,19 @@ static int mds_reconnect(struct obd_export *exp, struct obd_device *obd, * about that client, like open files, the last operation number it did * on the server, etc. */ -static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, +static int mds_connect(const struct lu_env *env, + struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data) { struct obd_export *exp; struct mds_export_data *med; struct mds_client_data *mcd = NULL; - int rc, abort_recovery; + int rc; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(obd); - /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. @@ -380,7 +375,7 @@ int mds_init_export(struct obd_export *exp) INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); - + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); @@ -420,7 +415,7 @@ static int mds_destroy_export(struct obd_export *export) CWARN("%s: allocation failure during cleanup; can not force " "close file handles on this service.\n", obd->obd_name); OBD_FREE(lmm, mds->mds_max_mdsize); - GOTO(out, rc = -ENOMEM); + GOTO(out_lmm, rc = -ENOMEM); } spin_lock(&med->med_open_lock); @@ -444,7 +439,7 @@ static int mds_destroy_export(struct obd_export *export) mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name, mfd->mfd_dentry->d_inode->i_ino); - rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1); + rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1); if (rc < 0) CWARN("mds_get_md failure, rc=%d\n", rc); else @@ -453,7 +448,6 @@ static int mds_destroy_export(struct obd_export *export) /* child orphan sem protects orphan_dec_test and * is_orphan race, mds_mfd_close drops it */ MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode); - rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd, !(export->exp_flags & OBD_OPT_FAILOVER), lmm, lmm_size, logcookies, @@ -476,16 +470,14 @@ static int mds_destroy_export(struct obd_export *export) spin_lock(&med->med_open_lock); } + spin_unlock(&med->med_open_lock); OBD_FREE(logcookies, mds->mds_max_cookiesize); +out_lmm: OBD_FREE(lmm, mds->mds_max_mdsize); - - spin_unlock(&med->med_open_lock); - +out: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); mds_client_free(export); - - out: RETURN(rc); } @@ -546,7 +538,7 @@ static int mds_getstatus(struct ptlrpc_request *req) /* get the LOV EA from @inode and store it into @md. It can be at most * @size bytes, and @size is updated with the actual EA size. - * The EA size is also returned on success, and -ve errno on failure. + * The EA size is also returned on success, and -ve errno on failure. * If there is no EA then 0 is returned. */ int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, int *size, int lock) @@ -864,7 +856,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, struct lvfs_run_ctxt saved; struct mds_body *body; struct dentry *dparent = NULL, *dchild = NULL; - struct lvfs_ucred uc = {NULL,}; + struct lvfs_ucred uc = {0,}; struct lustre_handle parent_lockh; int namesize; int rc = 0, cleanup_phase = 0, resent_req = 0; @@ -939,10 +931,10 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, if (resent_req == 0) { if (name) { - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2); - rc = mds_get_parent_child_locked(obd, &obd->u.mds, + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); + rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, - &parent_lockh, + &parent_lockh, &dparent, LCK_CR, MDS_INODELOCK_UPDATE, name, namesize, @@ -952,11 +944,11 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, /* For revalidate by fid we always take UPDATE lock */ dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, LCK_CR, child_lockh, - NULL, 0, child_part); + child_part); LASSERT(dchild); if (IS_ERR(dchild)) rc = PTR_ERR(dchild); - } + } if (rc) GOTO(cleanup, rc); } else { @@ -1031,7 +1023,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset) struct lvfs_run_ctxt saved; struct dentry *de; struct mds_body *body; - struct lvfs_ucred uc = { NULL, }; + struct lvfs_ucred uc = {0,}; int rc = 0; ENTRY; @@ -1059,7 +1051,8 @@ static int mds_getattr(struct ptlrpc_request *req, int offset) GOTO(out_pop, rc); } - req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF); + req->rq_status = mds_getattr_internal(obd, de, req, body, + REPLY_REC_OFF); l_dput(de); GOTO(out_pop, rc); @@ -1184,7 +1177,7 @@ static int mds_readpage(struct ptlrpc_request *req, int offset) struct mds_body *body, *repbody; struct lvfs_run_ctxt saved; int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; - struct lvfs_ucred uc = {NULL,}; + struct lvfs_ucred uc = {0,}; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) @@ -1275,8 +1268,8 @@ int mds_reint(struct ptlrpc_request *req, int offset, return rc; } -static int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process) +int mds_filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) { switch (lustre_msg_get_opc(req->rq_reqmsg)) { case MDS_CONNECT: /* This will never get here, but for completeness. */ @@ -1287,21 +1280,23 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req, RETURN(0); case MDS_CLOSE: + case MDS_DONE_WRITING: case MDS_SYNC: /* used in unmounting */ case OBD_PING: case MDS_REINT: + case SEQ_QUERY: + case FLD_QUERY: case LDLM_ENQUEUE: *process = target_queue_recovery_request(req, obd); RETURN(0); default: DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); - *process = 0; - /* XXX what should we set rq_status to here? */ - req->rq_status = -EAGAIN; - RETURN(ptlrpc_error(req)); + *process = -EAGAIN; + RETURN(0); } } +EXPORT_SYMBOL(mds_filter_recovery_request); static char *reint_names[] = { [REINT_SETATTR] "setattr", @@ -1392,7 +1387,7 @@ static int mds_handle_quotactl(struct ptlrpc_request *req) RETURN(0); } -static int mds_msg_check_version(struct lustre_msg *msg) +int mds_msg_check_version(struct lustre_msg *msg) { int rc; @@ -1400,6 +1395,9 @@ static int mds_msg_check_version(struct lustre_msg *msg) case MDS_CONNECT: case MDS_DISCONNECT: case OBD_PING: + case SEC_CTX_INIT: + case SEC_CTX_INIT_CONT: + case SEC_CTX_FINI: rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", @@ -1412,6 +1410,8 @@ static int mds_msg_check_version(struct lustre_msg *msg) case MDS_GETATTR_NAME: case MDS_STATFS: case MDS_READPAGE: + case MDS_WRITEPAGE: + case MDS_IS_SUBDIR: case MDS_REINT: case MDS_CLOSE: case MDS_DONE_WRITING: @@ -1424,6 +1424,8 @@ static int mds_msg_check_version(struct lustre_msg *msg) case MDS_QUOTACTL: case QUOTA_DQACQ: case QUOTA_DQREL: + case SEQ_QUERY: + case FLD_QUERY: rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", @@ -1463,11 +1465,12 @@ static int mds_msg_check_version(struct lustre_msg *msg) } return rc; } +EXPORT_SYMBOL(mds_msg_check_version); int mds_handle(struct ptlrpc_request *req) { int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; - int rc = 0; + int rc; struct mds_obd *mds = NULL; /* quell gcc overwarning */ struct obd_device *obd = NULL; ENTRY; @@ -1485,7 +1488,7 @@ int mds_handle(struct ptlrpc_request *req) /* XXX identical to OST */ if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) { struct mds_export_data *med; - int recovering, abort_recovery; + int recovering; if (req->rq_export == NULL) { CERROR("operation %d on unconnected MDS from %s\n", @@ -1497,7 +1500,7 @@ int mds_handle(struct ptlrpc_request *req) med = &req->rq_export->exp_mds_data; obd = req->rq_export->exp_obd; - mds = &obd->u.mds; + mds = mds_req2mds(req); /* sanity check: if the xid matches, the request must * be marked as a resent or replayed */ @@ -1520,16 +1523,18 @@ int mds_handle(struct ptlrpc_request *req) /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { rc = mds_filter_recovery_request(req, obd, &should_process); if (rc || !should_process) RETURN(rc); + else if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); + RETURN(rc); + } } } @@ -1537,9 +1542,15 @@ int mds_handle(struct ptlrpc_request *req) case MDS_CONNECT: DEBUG_REQ(D_INODE, req, "connect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); - rc = target_handle_connect(req, mds_handle); + rc = target_handle_connect(req); if (!rc) { /* Now that we have an export, set mds. */ + /* + * XXX nikita: these assignments are useless: mds is + * never used below, and obd is only used for + * MSG_LAST_REPLAY case, which never happens for + * MDS_CONNECT. + */ obd = req->rq_export->exp_obd; mds = mds_req2mds(req); } @@ -1770,7 +1781,7 @@ int mds_handle(struct ptlrpc_request *req) /* If we're DISCONNECTing, the mds_export_data is already freed */ if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) { struct mds_export_data *med = &req->rq_export->exp_mds_data; - + /* I don't think last_xid is used for anyway, so I'm not sure if we need to care about last_close_xid here.*/ lustre_msg_set_last_xid(req->rq_repmsg, @@ -1782,15 +1793,6 @@ int mds_handle(struct ptlrpc_request *req) EXIT; out: - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - return target_queue_final_reply(req, rc); - } - /* Lost a race with recovery; let the error path DTRT. */ - rc = req->rq_status = -ENOTCONN; - } - target_send_reply(req, rc, fail); return 0; } @@ -1824,7 +1826,6 @@ int mds_update_server_data(struct obd_device *obd, int force_sync) pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc) CERROR("error writing MDS server data: rc = %d\n", rc); - RETURN(rc); } @@ -1869,6 +1870,30 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options) options = ++p; } } +static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg) +{ + int rc; + ENTRY; + + rc = llog_start_commit_thread(); + if (rc < 0) + RETURN(rc); + + if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { + class_uuid_t uuid; + + ll_generate_random_uuid(uuid); + class_uuid_unparse(uuid, &mds->mds_lov_uuid); + + OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3)); + if (mds->mds_profile == NULL) + RETURN(-ENOMEM); + + strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3), + LUSTRE_CFG_BUFLEN(lcfg, 3)); + } + RETURN(rc); +} /* mount the file system (secretly). lustre_cfg parameters are: * 1 = device @@ -1876,14 +1901,13 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options) * 3 = config name * 4 = mount options */ -static int mds_setup(struct obd_device *obd, obd_count len, void *buf) +static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { struct lprocfs_static_vars lvars; - struct lustre_cfg* lcfg = buf; struct mds_obd *mds = &obd->u.mds; - struct lustre_sb_info *lsi; struct lustre_mount_info *lmi; struct vfsmount *mnt; + struct lustre_sb_info *lsi; struct obd_uuid uuid; __u8 *uuid_ptr; char *str, *label; @@ -1910,6 +1934,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) /* We mounted in lustre_fill_super. lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/ + lsi = s2lsi(lmi->lmi_sb); fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts); fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts); @@ -1936,6 +1961,15 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) } ldlm_register_intent(obd->obd_namespace, mds_intent_policy); + lprocfs_init_vars(mds, &lvars); + if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && + lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) { + /* Init private stats here */ + mds_stats_counter_init(obd->obd_stats); + obd->obd_proc_exports = proc_mkdir("exports", + obd->obd_proc_entry); + } + rc = mds_fs_setup(obd, mnt); if (rc) { CERROR("%s: MDS filesystem method init failed: rc = %d\n", @@ -1943,24 +1977,10 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_ns, rc); } - rc = llog_start_commit_thread(); + rc = mds_lov_presetup(mds, lcfg); if (rc < 0) GOTO(err_fs, rc); - if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { - class_uuid_t uuid; - - ll_generate_random_uuid(uuid); - class_uuid_unparse(uuid, &mds->mds_lov_uuid); - - OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3)); - if (mds->mds_profile == NULL) - GOTO(err_fs, rc = -ENOMEM); - - strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3), - LUSTRE_CFG_BUFLEN(lcfg, 3)); - } - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "mds_ldlm_client", &obd->obd_ldlm_client); obd->obd_replayable = 1; @@ -1969,12 +1989,14 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) if (rc) GOTO(err_fs, rc); +#if 0 mds->mds_group_hash = upcall_cache_init(obd->obd_name); if (IS_ERR(mds->mds_group_hash)) { rc = PTR_ERR(mds->mds_group_hash); mds->mds_group_hash = NULL; GOTO(err_qctxt, rc); } +#endif /* Don't wait for mds_postrecov trying to clear orphans */ obd->obd_async_recov = 1; @@ -1986,15 +2008,6 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) if (rc) GOTO(err_qctxt, rc); - lprocfs_init_vars(mds, &lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && - lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) { - /* Init private stats here */ - mds_stats_counter_init(obd->obd_stats); - obd->obd_proc_exports = proc_mkdir("exports", - obd->obd_proc_entry); - } - uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb); if (uuid_ptr != NULL) { class_uuid_unparse(uuid_ptr, &uuid); @@ -2013,8 +2026,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) "/proc/fs/lustre/mds/%s/recovery_status.\n", obd->obd_name, lustre_cfg_string(lcfg, 1), label ?: "", label ? "/" : "", str, - obd->obd_recoverable_clients, - (obd->obd_recoverable_clients == 1) ? + obd->obd_max_recoverable_clients, + (obd->obd_max_recoverable_clients == 1) ? "client" : "clients", (int)(OBD_RECOVERY_TIMEOUT) / 60, (int)(OBD_RECOVERY_TIMEOUT) % 60, @@ -2036,9 +2049,13 @@ err_qctxt: err_fs: /* No extra cleanup needed for llog_init_commit_thread() */ mds_fs_cleanup(obd); +#if 0 upcall_cache_cleanup(mds->mds_group_hash); mds->mds_group_hash = NULL; +#endif err_ns: + lprocfs_obd_cleanup(obd); + lprocfs_free_obd_stats(obd); ldlm_namespace_free(obd->obd_namespace, 0); obd->obd_namespace = NULL; err_ops: @@ -2087,29 +2104,29 @@ static int mds_postsetup(struct obd_device *obd) int rc = 0; ENTRY; - rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL, + rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL, &llog_lvfs_ops); if (rc) RETURN(rc); - rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL, + rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL, &llog_lvfs_ops); if (rc) RETURN(rc); if (mds->mds_profile) { struct lustre_profile *lprof; - /* The profile defines which osc and mdc to connect to, for a + /* The profile defines which osc and mdc to connect to, for a client. We reuse that here to figure out the name of the - lov to use (and ignore lprof->lp_mdc). - The profile was set in the config log with + lov to use (and ignore lprof->lp_md). + The profile was set in the config log with LCFG_MOUNTOPT profilenm oscnm mdcnm */ lprof = class_get_profile(mds->mds_profile); if (lprof == NULL) { CERROR("No profile found: %s\n", mds->mds_profile); GOTO(err_cleanup, rc = -ENOENT); } - rc = mds_lov_connect(obd, lprof->lp_osc); + rc = mds_lov_connect(obd, lprof->lp_dt); if (rc) GOTO(err_cleanup, rc); } @@ -2144,15 +2161,16 @@ int mds_postrecov(struct obd_device *obd) } /* clean PENDING dir */ - rc = mds_cleanup_pending(obd); - if (rc < 0) - GOTO(out, rc); + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + rc = mds_cleanup_pending(obd); + if (rc < 0) + GOTO(out, rc); /* FIXME Does target_finish_recovery really need this to block? */ /* Notify the LOV, which will in turn call mds_notify for each tgt */ /* This means that we have to hack obd_notify to think we're obd_set_up during mds_lov_connect. */ - obd_notify(obd->u.mds.mds_osc_obd, NULL, + obd_notify(obd->u.mds.mds_osc_obd, NULL, obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC, NULL); @@ -2185,7 +2203,11 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) case OBD_CLEANUP_EARLY: break; case OBD_CLEANUP_EXPORTS: - target_cleanup_recovery(obd); + /*XXX Use this for mdd mds cleanup, so comment out + *this target_cleanup_recovery for this tmp MDD MDS + *Wangdi*/ + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + target_cleanup_recovery(obd); mds_lov_early_clean(obd); break; case OBD_CLEANUP_SELF_EXP: @@ -2222,12 +2244,14 @@ static int mds_cleanup(struct obd_device *obd) lquota_cleanup(mds_quota_interface_ref, obd); mds_update_server_data(obd, 1); - if (mds->mds_lov_objids != NULL) + if (mds->mds_lov_objids != NULL) OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size); mds_fs_cleanup(obd); +#if 0 upcall_cache_cleanup(mds->mds_group_hash); mds->mds_group_hash = NULL; +#endif server_put_mount(obd->obd_name, mds->mds_vfsmnt); obd->u.obt.obt_sb = NULL; @@ -2494,7 +2518,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, RETURN(ELDLM_LOCK_REPLACED); } -static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) +static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { struct mds_obd *mds = &obd->u.mds; struct lprocfs_static_vars lvars; @@ -2533,7 +2557,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, mds_handle, LUSTRE_MDS_NAME, obd->obd_proc_entry, NULL, - mds_min_threads, mds_max_threads, "ll_mdt"); + mds_min_threads, mds_max_threads, "ll_mdt", 0); if (!mds->mds_service) { CERROR("failed to start service\n"); @@ -2551,7 +2575,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) mds_handle, "mds_setattr", obd->obd_proc_entry, NULL, mds_min_threads, mds_max_threads, - "ll_mdt_attr"); + "ll_mdt_attr", 0); if (!mds->mds_setattr_service) { CERROR("failed to start getattr service\n"); GOTO(err_thread, rc = -ENOMEM); @@ -2568,7 +2592,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) mds_handle, "mds_readpage", obd->obd_proc_entry, NULL, MDS_THREADS_MIN_READPAGE, mds_max_threads, - "ll_mdt_rdpg"); + "ll_mdt_rdpg", 0); if (!mds->mds_readpage_service) { CERROR("failed to start readpage service\n"); GOTO(err_thread2, rc = -ENOMEM); @@ -2580,7 +2604,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_thread3, rc); ping_evictor_start(); - + RETURN(0); err_thread3: @@ -2664,7 +2688,6 @@ static int mds_health_check(struct obd_device *obd) LASSERT(mds->mds_health_check_filp != NULL); rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp); #endif - return rc; } @@ -2675,9 +2698,8 @@ static int mds_process_config(struct obd_device *obd, obd_count len, void *buf) int rc; lprocfs_init_vars(mds, &lvars); - + rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd); - return(rc); } @@ -2718,7 +2740,7 @@ static struct obd_ops mdt_obd_ops = { quota_interface_t *mds_quota_interface_ref; extern quota_interface_t mds_quota_interface; -static int __init mds_init(void) +static __attribute__((unused)) int __init mds_init(void) { int rc; struct lprocfs_static_vars lvars; @@ -2732,28 +2754,211 @@ static int __init mds_init(void) return rc; } init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops); - + lprocfs_init_vars(mds, &lvars); - class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME); + class_register_type(&mds_obd_ops, NULL, + lvars.module_vars, LUSTRE_MDS_NAME, NULL); lprocfs_init_vars(mdt, &lvars); - class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME); + mdt_obd_ops = mdt_obd_ops; //make compiler happy +// class_register_type(&mdt_obd_ops, NULL, +// lvars.module_vars, LUSTRE_MDT_NAME, NULL); return 0; } -static void /*__exit*/ mds_exit(void) +static __attribute__((unused)) void /*__exit*/ mds_exit(void) { lquota_exit(mds_quota_interface_ref); if (mds_quota_interface_ref) PORTAL_SYMBOL_PUT(mds_quota_interface); class_unregister_type(LUSTRE_MDS_NAME); - class_unregister_type(LUSTRE_MDT_NAME); +// class_unregister_type(LUSTRE_MDT_NAME); +} +/*mds still need lov setup here*/ +static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg) +{ + struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; + const char *dev; + struct vfsmount *mnt; + struct lustre_sb_info *lsi; + struct lustre_mount_info *lmi; + struct dentry *dentry; + struct file *file; + int rc = 0; + ENTRY; + + CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name); + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + RETURN(0); + + if (lcfg->lcfg_bufcount < 5) { + CERROR("invalid arg for setup %s\n", MDD_OBD_NAME); + RETURN(-EINVAL); + } + dev = lustre_cfg_string(lcfg, 4); + lmi = server_get_mount(dev); + LASSERT(lmi != NULL); + + lsi = s2lsi(lmi->lmi_sb); + mnt = lmi->lmi_mnt; + /* FIXME: MDD LOV initialize objects. + * we need only lmi here but not get mount + * OSD did mount already, so put mount back + */ + atomic_dec(&lsi->lsi_mounts); + mntput(mnt); + + obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); + mds_init_ctxt(obd, mnt); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create OBJECTS directory: rc = %d\n", rc); + GOTO(err_putfs, rc); + } + mds->mds_objects_dir = dentry; + + dentry = lookup_one_len("__iopen__", current->fs->pwd, + strlen("__iopen__")); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc); + GOTO(err_objects, rc); + } + + mds->mds_fid_de = dentry; + if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) { + rc = -ENOENT; + CERROR("__iopen__ directory has no inode? rc = %d\n", rc); + GOTO(err_fid, rc); + } + + /* open and test the lov objd file */ + file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644); + if (IS_ERR(file)) { + rc = PTR_ERR(file); + CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc); + GOTO(err_fid, rc = PTR_ERR(file)); + } + mds->mds_lov_objid_filp = file; + if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { + CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID, + file->f_dentry->d_inode->i_mode); + GOTO(err_lov_objid, rc = -ENOENT); + } + + rc = mds_lov_presetup(mds, lcfg); + if (rc < 0) + GOTO(err_objects, rc); + + /* Don't wait for mds_postrecov trying to clear orphans */ + obd->obd_async_recov = 1; + rc = mds_postsetup(obd); + /* Bug 11557 - allow async abort_recov start + FIXME can remove most of this obd_async_recov plumbing + obd->obd_async_recov = 0; + */ + + if (rc) + GOTO(err_objects, rc); + + mds->mds_max_mdsize = sizeof(struct lov_mds_md); + mds->mds_max_cookiesize = sizeof(struct llog_cookie); + +err_pop: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +err_lov_objid: + if (mds->mds_lov_objid_filp && + filp_close((struct file *)mds->mds_lov_objid_filp, 0)) + CERROR("can't close %s after error\n", LOV_OBJID); +err_fid: + dput(mds->mds_fid_de); +err_objects: + dput(mds->mds_objects_dir); +err_putfs: + fsfilt_put_ops(obd->obd_fsops); + goto err_pop; +} + +static int mds_cmd_cleanup(struct obd_device *obd) +{ + struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; + int rc = 0; + ENTRY; + + if (obd->obd_fail) + LCONSOLE_WARN("%s: shutting down for failover; client state " + "will be preserved.\n", obd->obd_name); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (mds->mds_lov_objid_filp) { + rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0); + mds->mds_lov_objid_filp = NULL; + if (rc) + CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc); + } + if (mds->mds_objects_dir != NULL) { + l_dput(mds->mds_objects_dir); + mds->mds_objects_dir = NULL; + } + + if (mds->mds_lov_objids != NULL) + OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size); + + shrink_dcache_parent(mds->mds_fid_de); + dput(mds->mds_fid_de); + LL_DQUOT_OFF(obd->u.obt.obt_sb); + fsfilt_put_ops(obd->obd_fsops); + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +} + +#if 0 +static int mds_cmd_health_check(struct obd_device *obd) +{ + return 0; +} +#endif +static struct obd_ops mds_cmd_obd_ops = { + .o_owner = THIS_MODULE, + .o_setup = mds_cmd_setup, + .o_cleanup = mds_cmd_cleanup, + .o_precleanup = mds_precleanup, + .o_create = mds_obd_create, + .o_destroy = mds_obd_destroy, + .o_llog_init = mds_llog_init, + .o_llog_finish = mds_llog_finish, + .o_notify = mds_notify, + .o_postrecov = mds_postrecov, + // .o_health_check = mds_cmd_health_check, +}; + +static int __init mds_cmd_init(void) +{ + struct lprocfs_static_vars lvars; + + lprocfs_init_vars(mds, &lvars); + class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars, + LUSTRE_MDS_NAME, NULL); + + return 0; +} + +static void /*__exit*/ mds_cmd_exit(void) +{ + class_unregister_type(LUSTRE_MDS_NAME); } MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS)"); MODULE_LICENSE("GPL"); -module_init(mds_init); -module_exit(mds_exit); +module_init(mds_cmd_init); +module_exit(mds_cmd_exit);