X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=671ac7a82fea9bdf470972b3f9d4f40a283266fb;hp=e2f4c3c2d5441365acb841b774422ec7ca0807c5;hb=99aec0dc17de5f980588c08d8befd0f3119db0ea;hpb=fcb0ebe13fbbf8f4ed9d593387164482a806672c diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index e2f4c3c..671ac7a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -48,9 +46,6 @@ * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -102,8 +97,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = { * Initialized in mdt_mod_init(). */ static unsigned long mdt_num_threads; -static unsigned long mdt_min_threads; -static unsigned long mdt_max_threads; /* ptlrpc request handler for MDT. All handlers are * grouped into several slices - struct mdt_opc_slice, @@ -343,20 +336,20 @@ static int mdt_getstatus(struct mdt_thread_info *info) static int mdt_statfs(struct mdt_thread_info *info) { - struct ptlrpc_request *req = mdt_info_req(info); - struct md_device *next = info->mti_mdt->mdt_child; - struct ptlrpc_service *svc; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_device *next = info->mti_mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + int rc; - ENTRY; + ENTRY; - svc = info->mti_pill->rc_req->rq_rqbd->rqbd_service; + svcpt = info->mti_pill->rc_req->rq_rqbd->rqbd_svcpt; - /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDT_SERVICE_WATCHDOG_FACTOR * - at_get(&svc->srv_at_estimate)) + 1); + /* This will trigger a watchdog timeout */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, + (MDT_SERVICE_WATCHDOG_FACTOR * + at_get(&svcpt->scp_at_estimate)) + 1); rc = mdt_check_ucred(info); if (rc) @@ -370,7 +363,7 @@ static int mdt_statfs(struct mdt_thread_info *info) } if (rc == 0) - mdt_counter_incr(req->rq_export, LPROC_MDT_STATFS); + mdt_counter_incr(req, LPROC_MDT_STATFS); RETURN(rc); } @@ -578,21 +571,35 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } else if (S_ISLNK(la->la_mode) && reqbody->valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; - buffer->lb_len = reqbody->eadatasize; + /* eadatasize from client includes NULL-terminator, so + * there is no need to read it */ + buffer->lb_len = reqbody->eadatasize - 1; rc = mo_readlink(env, next, buffer); if (unlikely(rc <= 0)) { CERROR("readlink failed: %d\n", rc); rc = -EFAULT; } else { - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) - rc -= 2; - repbody->valid |= OBD_MD_LINKNAME; - repbody->eadatasize = rc; - /* NULL terminate */ - ((char*)ma->ma_lmm)[rc - 1] = 0; - CDEBUG(D_INODE, "symlink dest %s, len = %d\n", - (char*)ma->ma_lmm, rc); - rc = 0; + int print_limit = min_t(int, CFS_PAGE_SIZE - 128, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) + rc -= 2; + repbody->valid |= OBD_MD_LINKNAME; + /* we need to report back size with NULL-terminator + * because client expects that */ + repbody->eadatasize = rc + 1; + if (repbody->eadatasize != reqbody->eadatasize) + CERROR("Read shorter symlink %d, expected %d\n", + rc, reqbody->eadatasize - 1); + /* NULL terminate */ + ((char *)ma->ma_lmm)[rc] = 0; + + /* If the total CDEBUG() size is larger than a page, it + * will print a warning to the console, avoid this by + * printing just the last part of the symlink. */ + CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n", + print_limit < rc ? "..." : "", print_limit, + (char *)ma->ma_lmm + rc - print_limit, rc); + rc = 0; } } @@ -664,7 +671,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, out: if (rc == 0) - mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR); + mdt_counter_incr(req, LPROC_MDT_GETATTR); RETURN(rc); } @@ -762,7 +769,6 @@ static int mdt_getattr(struct mdt_thread_info *info) mdt_exit_ucred(info); EXIT; out_shrink: - mdt_client_compatibility(info); rc2 = mdt_fix_reply(info); if (rc == 0) @@ -907,16 +913,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); - rc = mdt_object_exists(parent); - if (unlikely(rc == 0)) { - LU_OBJECT_DEBUG(D_WARNING, info->mti_env, - &parent->mot_obj.mo_lu, - "Parent doesn't exist!\n"); - RETURN(-ESTALE); - } else if (!info->mti_cross_ref) { - LASSERTF(rc > 0, "Parent "DFID" is on remote server\n", - PFID(mdt_object_fid(parent))); - } + rc = mdt_object_exists(parent); + if (unlikely(rc == 0)) { + LU_OBJECT_DEBUG(D_INODE, info->mti_env, + &parent->mot_obj.mo_lu, + "Parent doesn't exist!\n"); + RETURN(-ESTALE); + } else if (!info->mti_cross_ref) { + LASSERTF(rc > 0, "Parent "DFID" is on remote server\n", + PFID(mdt_object_fid(parent))); + } if (lname) { rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); if (rc != 0) { @@ -1739,7 +1745,7 @@ static int mdt_sync(struct mdt_thread_info *info) rc = err_serious(rc); } if (rc == 0) - mdt_counter_incr(req->rq_export, LPROC_MDT_SYNC); + mdt_counter_incr(req, LPROC_MDT_SYNC); RETURN(rc); } @@ -2075,6 +2081,24 @@ static struct mdt_object *mdt_obj(struct lu_object *o) return container_of0(o, struct mdt_object, mot_obj.mo_lu); } +struct mdt_object *mdt_object_new(const struct lu_env *env, + struct mdt_device *d, + const struct lu_fid *f) +{ + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + struct lu_object *o; + struct mdt_object *m; + ENTRY; + + CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f)); + o = lu_object_find(env, &d->mdt_md_dev.md_lu_dev, f, &conf); + if (unlikely(IS_ERR(o))) + m = (struct mdt_object *)o; + else + m = mdt_obj(o); + RETURN(m); +} + struct mdt_object *mdt_object_find(const struct lu_env *env, struct mdt_device *d, const struct lu_fid *f) @@ -2183,7 +2207,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) { struct lu_env env; - rc = lu_env_init(&env, LCT_MD_THREAD); + rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) CWARN("lu_env initialization failed with rc = %d," "cannot start asynchronous commit\n", rc); @@ -3902,73 +3926,90 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m) static int mdt_start_ptlrpc_service(struct mdt_device *m) { - int rc; static struct ptlrpc_service_conf conf; cfs_proc_dir_entry_t *procfs_entry; - ENTRY; - - procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry; - - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = MDS_MAXREQSIZE, - .psc_max_reply_size = MDS_MAXREPSIZE, - .psc_req_portal = MDS_REQUEST_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - /* - * We'd like to have a mechanism to set this on a per-device - * basis, but alas... - */ - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD - }; - - m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client; - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, - "mdt_ldlm_client", m->mdt_ldlm_client); - - m->mdt_regular_service = - ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME, - procfs_entry, target_print_req, - LUSTRE_MDT_NAME); - if (m->mdt_regular_service == NULL) - RETURN(-ENOMEM); - - rc = ptlrpc_start_threads(m->mdt_regular_service); - if (rc) - GOTO(err_mdt_svc, rc); - - /* - * readpage service configuration. Parameters have to be adjusted, - * ideally. - */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = MDS_MAXREQSIZE, - .psc_max_reply_size = MDS_MAXREPSIZE, - .psc_req_portal = MDS_READPAGE_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD - }; - m->mdt_readpage_service = - ptlrpc_init_svc_conf(&conf, mdt_readpage_handle, - LUSTRE_MDT_NAME "_readpage", - procfs_entry, target_print_req,"mdt_rdpg"); - - if (m->mdt_readpage_service == NULL) { - CERROR("failed to start readpage service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); + int rc = 0; + ENTRY; + + m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client; + ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, + "mdt_ldlm_client", m->mdt_ldlm_client); + + procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry; + + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME, + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = MDS_MAXREQSIZE, + .bc_rep_max_size = MDS_MAXREPSIZE, + .bc_req_portal = MDS_REQUEST_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + /* + * We'd like to have a mechanism to set this on a per-device + * basis, but alas... + */ + .psc_thr = { + .tc_thr_name = LUSTRE_MDT_NAME, + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_ops = { + .so_req_handler = mdt_regular_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_regular_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_regular_service)) { + rc = PTR_ERR(m->mdt_regular_service); + CERROR("failed to start regular mdt service: %d\n", rc); + m->mdt_regular_service = NULL; + + RETURN(rc); + } + + /* + * readpage service configuration. Parameters have to be adjusted, + * ideally. + */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_readpage", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = MDS_MAXREQSIZE, + .bc_rep_max_size = MDS_MAXREPSIZE, + .bc_req_portal = MDS_READPAGE_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_rdpg", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_ops = { + .so_req_handler = mdt_readpage_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_readpage_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_readpage_service)) { + rc = PTR_ERR(m->mdt_readpage_service); + CERROR("failed to start readpage service: %d\n", rc); + m->mdt_readpage_service = NULL; + + GOTO(err_mdt_svc, rc); } - rc = ptlrpc_start_threads(m->mdt_readpage_service); - /* * setattr service configuration. * @@ -3976,180 +4017,220 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) * preserve this portal for a certain time, it should be removed * eventually. LU-617. */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = MDS_MAXREQSIZE, - .psc_max_reply_size = MDS_MAXREPSIZE, - .psc_req_portal = MDS_SETATTR_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD - }; - - m->mdt_setattr_service = - ptlrpc_init_svc_conf(&conf, mdt_regular_handle, - LUSTRE_MDT_NAME "_setattr", - procfs_entry, target_print_req,"mdt_attr"); - - if (!m->mdt_setattr_service) { - CERROR("failed to start setattr service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(m->mdt_setattr_service); - if (rc) - GOTO(err_mdt_svc, rc); - - /* - * sequence controller service configuration - */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = SEQ_MAXREQSIZE, - .psc_max_reply_size = SEQ_MAXREPSIZE, - .psc_req_portal = SEQ_CONTROLLER_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD - }; - - m->mdt_mdsc_service = - ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle, - LUSTRE_MDT_NAME"_mdsc", - procfs_entry, target_print_req,"mdt_mdsc"); - if (!m->mdt_mdsc_service) { - CERROR("failed to start seq controller service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(m->mdt_mdsc_service); - if (rc) - GOTO(err_mdt_svc, rc); - - /* - * metadata sequence server service configuration - */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = SEQ_MAXREQSIZE, - .psc_max_reply_size = SEQ_MAXREPSIZE, - .psc_req_portal = SEQ_METADATA_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_setattr", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = MDS_MAXREQSIZE, + .bc_rep_max_size = MDS_MAXREPSIZE, + .bc_req_portal = MDS_SETATTR_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_attr", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_ops = { + .so_req_handler = mdt_regular_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_setattr_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_setattr_service)) { + rc = PTR_ERR(m->mdt_setattr_service); + CERROR("failed to start setattr service: %d\n", rc); + m->mdt_setattr_service = NULL; + + GOTO(err_mdt_svc, rc); + } + + /* + * sequence controller service configuration + */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_mdsc", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = SEQ_MAXREQSIZE, + .bc_rep_max_size = SEQ_MAXREPSIZE, + .bc_req_portal = SEQ_CONTROLLER_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_mdsc", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_ops = { + .so_req_handler = mdt_mdsc_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_mdsc_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_mdsc_service)) { + rc = PTR_ERR(m->mdt_mdsc_service); + CERROR("failed to start seq controller service: %d\n", rc); + m->mdt_mdsc_service = NULL; + + GOTO(err_mdt_svc, rc); + } + + /* + * metadata sequence server service configuration + */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_mdss", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = SEQ_MAXREQSIZE, + .bc_rep_max_size = SEQ_MAXREPSIZE, + .bc_req_portal = SEQ_METADATA_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_mdss", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD + }, + .psc_ops = { + .so_req_handler = mdt_mdss_handle, + .so_req_printer = target_print_req, + }, }; - - m->mdt_mdss_service = - ptlrpc_init_svc_conf(&conf, mdt_mdss_handle, - LUSTRE_MDT_NAME"_mdss", - procfs_entry, target_print_req,"mdt_mdss"); - if (!m->mdt_mdss_service) { - CERROR("failed to start metadata seq server service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(m->mdt_mdss_service); - if (rc) - GOTO(err_mdt_svc, rc); - - - /* - * Data sequence server service configuration. We want to have really - * cluster-wide sequences space. This is why we start only one sequence - * controller which manages space. - */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = SEQ_MAXREQSIZE, - .psc_max_reply_size = SEQ_MAXREPSIZE, - .psc_req_portal = SEQ_DATA_PORTAL, - .psc_rep_portal = OSC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD + m->mdt_mdss_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_mdss_service)) { + rc = PTR_ERR(m->mdt_mdss_service); + CERROR("failed to start metadata seq server service: %d\n", rc); + m->mdt_mdss_service = NULL; + + GOTO(err_mdt_svc, rc); + } + + /* + * Data sequence server service configuration. We want to have really + * cluster-wide sequences space. This is why we start only one sequence + * controller which manages space. + */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_dtss", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = SEQ_MAXREQSIZE, + .bc_rep_max_size = SEQ_MAXREPSIZE, + .bc_req_portal = SEQ_DATA_PORTAL, + .bc_rep_portal = OSC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_dtss", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD + }, + .psc_ops = { + .so_req_handler = mdt_dtss_handle, + .so_req_printer = target_print_req, + }, }; - - m->mdt_dtss_service = - ptlrpc_init_svc_conf(&conf, mdt_dtss_handle, - LUSTRE_MDT_NAME"_dtss", - procfs_entry, target_print_req,"mdt_dtss"); - if (!m->mdt_dtss_service) { - CERROR("failed to start data seq server service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(m->mdt_dtss_service); - if (rc) - GOTO(err_mdt_svc, rc); - - /* FLD service start */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = FLD_MAXREQSIZE, - .psc_max_reply_size = FLD_MAXREPSIZE, - .psc_req_portal = FLD_REQUEST_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, + m->mdt_dtss_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_dtss_service)) { + rc = PTR_ERR(m->mdt_dtss_service); + CERROR("failed to start data seq server service: %d\n", rc); + m->mdt_dtss_service = NULL; + + GOTO(err_mdt_svc, rc); + } + + /* FLD service start */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_fld", .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD - }; - - m->mdt_fld_service = - ptlrpc_init_svc_conf(&conf, mdt_fld_handle, - LUSTRE_MDT_NAME"_fld", - procfs_entry, target_print_req, "mdt_fld"); - if (!m->mdt_fld_service) { - CERROR("failed to start fld service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = FLD_MAXREQSIZE, + .bc_rep_max_size = FLD_MAXREPSIZE, + .bc_req_portal = FLD_REQUEST_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_fld", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD + }, + .psc_ops = { + .so_req_handler = mdt_fld_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_fld_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_fld_service)) { + rc = PTR_ERR(m->mdt_fld_service); + CERROR("failed to start fld service: %d\n", rc); + m->mdt_fld_service = NULL; + + GOTO(err_mdt_svc, rc); + } + + /* + * mds-mds service configuration. Separate portal is used to allow + * mds-mds requests be not blocked during recovery. + */ + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_mds", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = MDS_MAXREQSIZE, + .bc_rep_max_size = MDS_MAXREPSIZE, + .bc_req_portal = MDS_MDS_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "mdt_mds", + .tc_nthrs_min = MDT_MIN_THREADS, + .tc_nthrs_max = MDT_MAX_THREADS, + .tc_nthrs_user = mdt_num_threads, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_ops = { + .so_req_handler = mdt_xmds_handle, + .so_req_printer = target_print_req, + }, + }; + m->mdt_xmds_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mdt_xmds_service)) { + rc = PTR_ERR(m->mdt_xmds_service); + CERROR("failed to start xmds service: %d\n", rc); + m->mdt_xmds_service = NULL; + + GOTO(err_mdt_svc, rc); } - rc = ptlrpc_start_threads(m->mdt_fld_service); - if (rc) - GOTO(err_mdt_svc, rc); - - /* - * mds-mds service configuration. Separate portal is used to allow - * mds-mds requests be not blocked during recovery. - */ - conf = (typeof(conf)) { - .psc_nbufs = MDS_NBUFS, - .psc_bufsize = MDS_BUFSIZE, - .psc_max_req_size = MDS_MAXREQSIZE, - .psc_max_reply_size = MDS_MAXREPSIZE, - .psc_req_portal = MDS_MDS_PORTAL, - .psc_rep_portal = MDC_REPLY_PORTAL, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_min_threads = mdt_min_threads, - .psc_max_threads = mdt_max_threads, - .psc_ctx_tags = LCT_MD_THREAD - }; - m->mdt_xmds_service = - ptlrpc_init_svc_conf(&conf, mdt_xmds_handle, - LUSTRE_MDT_NAME "_mds", - procfs_entry, target_print_req,"mdt_xmds"); - - if (m->mdt_xmds_service == NULL) { - CERROR("failed to start xmds service\n"); - GOTO(err_mdt_svc, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(m->mdt_xmds_service); - if (rc) - GOTO(err_mdt_svc, rc); - EXIT; err_mdt_svc: if (rc) @@ -4886,7 +4967,8 @@ static const struct lu_object_operations mdt_obj_ops = { .loo_object_print = mdt_object_print }; -static int mdt_obd_set_info_async(struct obd_export *exp, +static int mdt_obd_set_info_async(const struct lu_env *env, + struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) @@ -5063,14 +5145,12 @@ static int mdt_obd_connect(const struct lu_env *env, rc = mdt_connect_internal(lexp, mdt, data); if (rc == 0) { - struct mdt_thread_info *mti; struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; + LASSERT(lcd); - mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - LASSERT(mti != NULL); - mti->mti_exp = lexp; - memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); - rc = mdt_client_new(env, mdt); + info->mti_exp = lexp; + memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); + rc = lut_client_new(env, lexp); if (rc == 0) mdt_export_stats_init(obd, lexp, localdata); } @@ -5115,6 +5195,7 @@ static int mdt_obd_reconnect(const struct lu_env *env, RETURN(rc); } + static int mdt_export_cleanup(struct obd_export *exp) { struct mdt_export_data *med = &exp->exp_mdt_data; @@ -5194,7 +5275,7 @@ out_lmm: /* cleanup client slot early */ /* Do not erase record for recoverable client. */ if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) - mdt_client_del(&env, mdt); + lut_client_del(&env, exp); lu_env_fini(&env); RETURN(rc); @@ -5238,12 +5319,21 @@ static int mdt_init_export(struct obd_export *exp) RETURN(0); rc = lut_client_alloc(exp); - if (rc == 0) - rc = ldlm_init_export(exp); if (rc) - CERROR("%s: Error %d while initializing export\n", - exp->exp_obd->obd_name, rc); + GOTO(err, rc); + + rc = ldlm_init_export(exp); + if (rc) + GOTO(err_free, rc); + RETURN(rc); + +err_free: + lut_client_free(exp); +err: + CERROR("%s: Error %d while initializing export\n", + exp->exp_obd->obd_name, rc); + return rc; } static int mdt_destroy_export(struct obd_export *exp) @@ -5755,7 +5845,7 @@ void mdt_enable_cos(struct mdt_device *mdt, int val) int rc; mdt->mdt_opts.mo_cos = !!val; - rc = lu_env_init(&env, LCT_MD_THREAD); + rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { CWARN("lu_env initialization failed with rc = %d," "cannot sync\n", rc); @@ -5799,30 +5889,11 @@ static struct lu_device_type mdt_device_type = { .ldt_ctx_tags = LCT_MD_THREAD }; -static struct lu_local_obj_desc mdt_last_recv = { - .llod_name = LAST_RCVD, - .llod_oid = MDT_LAST_RECV_OID, - .llod_is_index = 0, -}; - static int __init mdt_mod_init(void) { struct lprocfs_static_vars lvars; int rc; - llo_local_obj_register(&mdt_last_recv); - - if (mdt_num_threads > 0) { - if (mdt_num_threads > MDT_MAX_THREADS) - mdt_num_threads = MDT_MAX_THREADS; - if (mdt_num_threads < MDT_MIN_THREADS) - mdt_num_threads = MDT_MIN_THREADS; - mdt_max_threads = mdt_min_threads = mdt_num_threads; - } else { - mdt_max_threads = MDT_MAX_THREADS; - mdt_min_threads = MDT_MIN_THREADS; - } - lprocfs_mdt_init_vars(&lvars); rc = class_register_type(&mdt_obd_device_ops, NULL, lvars.module_vars, LUSTRE_MDT_NAME, @@ -5833,7 +5904,6 @@ static int __init mdt_mod_init(void) static void __exit mdt_mod_exit(void) { - llo_local_obj_unregister(&mdt_last_recv); class_unregister_type(LUSTRE_MDT_NAME); }