X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=61559bee6642df657614de93f68aed0eb512e9fe;hp=6a5128258a2cb67cae24644c9183024d83936de2;hb=f5ecf580adc319fd9abe41bd79389b50015d5c13;hpb=7186c88d389c236f5fe887e3c7f35f2249f8aa86 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 6a51282..61559be 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2010, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -91,102 +91,8 @@ ldlm_mode_t mdt_dlm_lock_modes[] = { [MDL_GROUP] = LCK_GROUP }; -/* - * Initialized in mdt_mod_init(). - */ -static unsigned long mdt_num_threads; -CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444, - "number of MDS service threads to start " - "(deprecated in favor of mds_num_threads)"); - -static unsigned long mds_num_threads; -CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444, - "number of MDS service threads to start"); - -static char *mds_num_cpts; -CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444, - "CPU partitions MDS threads should run on"); - -static unsigned long mds_rdpg_num_threads; -CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444, - "number of MDS readpage service threads to start"); - -static char *mds_rdpg_num_cpts; -CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444, - "CPU partitions MDS readpage threads should run on"); - -/* NB: these two should be removed along with setattr service in the future */ -static unsigned long mds_attr_num_threads; -CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444, - "number of MDS setattr service threads to start"); - -static char *mds_attr_num_cpts; -CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444, - "CPU partitions MDS setattr threads should run on"); - -/* ptlrpc request handler for MDT. All handlers are - * grouped into several slices - struct mdt_opc_slice, - * and stored in an array - mdt_handlers[]. - */ -struct mdt_handler { - /* The name of this handler. */ - const char *mh_name; - /* Fail id for this handler, checked at the beginning of this handler*/ - int mh_fail_id; - /* Operation code for this handler */ - __u32 mh_opc; - /* flags are listed in enum mdt_handler_flags below. */ - __u32 mh_flags; - /* The actual handler function to execute. */ - int (*mh_act)(struct mdt_thread_info *info); - /* Request format for this request. */ - const struct req_format *mh_fmt; -}; - -enum mdt_handler_flags { - /* - * struct mdt_body is passed in the incoming message, and object - * identified by this fid exists on disk. - * - * "habeo corpus" == "I have a body" - */ - HABEO_CORPUS = (1 << 0), - /* - * struct ldlm_request is passed in the incoming message. - * - * "habeo clavis" == "I have a key" - */ - HABEO_CLAVIS = (1 << 1), - /* - * this request has fixed reply format, so that reply message can be - * packed by generic code. - * - * "habeo refero" == "I have a reply" - */ - HABEO_REFERO = (1 << 2), - /* - * this request will modify something, so check whether the filesystem - * is readonly or not, then return -EROFS to client asap if necessary. - * - * "mutabor" == "I shall modify" - */ - MUTABOR = (1 << 3) -}; - -struct mdt_opc_slice { - __u32 mos_opc_start; - int mos_opc_end; - struct mdt_handler *mos_hs; -}; - -static struct mdt_opc_slice mdt_regular_handlers[]; -static struct mdt_opc_slice mdt_readpage_handlers[]; -static struct mdt_opc_slice mdt_xmds_handlers[]; -static struct mdt_opc_slice mdt_seq_handlers[]; -static struct mdt_opc_slice mdt_fld_handlers[]; static struct mdt_device *mdt_dev(struct lu_device *d); -static int mdt_regular_handle(struct ptlrpc_request *req); static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt, struct getinfo_fid2path *fp); @@ -329,7 +235,7 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o EXIT; } -static int mdt_getstatus(struct mdt_thread_info *info) +int mdt_getstatus(struct mdt_thread_info *info) { struct mdt_device *mdt = info->mti_mdt; struct md_device *next = mdt->mdt_child; @@ -374,7 +280,7 @@ static int mdt_getstatus(struct mdt_thread_info *info) RETURN(rc); } -static int mdt_statfs(struct mdt_thread_info *info) +int mdt_statfs(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); struct md_device *next = info->mti_mdt->mdt_child; @@ -409,21 +315,21 @@ static int mdt_statfs(struct mdt_thread_info *info) rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs); if (rc) RETURN(rc); - cfs_spin_lock(&info->mti_mdt->mdt_osfs_lock); + spin_lock(&info->mti_mdt->mdt_osfs_lock); info->mti_mdt->mdt_osfs = *osfs; info->mti_mdt->mdt_osfs_age = cfs_time_current_64(); - cfs_spin_unlock(&info->mti_mdt->mdt_osfs_lock); + spin_unlock(&info->mti_mdt->mdt_osfs_lock); } else { /** use cached statfs data */ - cfs_spin_lock(&info->mti_mdt->mdt_osfs_lock); + spin_lock(&info->mti_mdt->mdt_osfs_lock); *osfs = info->mti_mdt->mdt_osfs; - cfs_spin_unlock(&info->mti_mdt->mdt_osfs_lock); + spin_unlock(&info->mti_mdt->mdt_osfs_lock); } - if (rc == 0) + if (rc == 0) mdt_counter_incr(req, LPROC_MDT_STATFS); - RETURN(rc); + RETURN(rc); } /** @@ -542,19 +448,15 @@ void mdt_client_compatibility(struct mdt_thread_info *info) EXIT; } -static int mdt_big_lmm_get(const struct lu_env *env, struct mdt_object *o, - struct md_attr *ma) +static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, + char *name) { - struct mdt_thread_info *info; + const struct lu_env *env = info->mti_env; int rc; ENTRY; - info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - LASSERT(info != NULL); - LASSERT(ma->ma_lmm_size > 0); LASSERT(info->mti_big_lmm_used == 0); - rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, - XATTR_NAME_LOV); + rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name); if (rc < 0) RETURN(rc); @@ -580,21 +482,9 @@ static int mdt_big_lmm_get(const struct lu_env *env, struct mdt_object *o, info->mti_buf.lb_buf = info->mti_big_lmm; info->mti_buf.lb_len = info->mti_big_lmmsize; - rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, - XATTR_NAME_LOV); - if (rc < 0) - RETURN(rc); + rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name); - info->mti_big_lmm_used = 1; - ma->ma_valid |= MA_LOV; - ma->ma_lmm = info->mti_big_lmm; - ma->ma_lmm_size = rc; - - /* update mdt_max_mdsize so all clients will be aware about that */ - if (info->mti_mdt->mdt_max_mdsize < rc) - info->mti_mdt->mdt_max_mdsize = rc; - - RETURN(0); + RETURN(rc); } int mdt_attr_get_lov(struct mdt_thread_info *info, @@ -615,12 +505,70 @@ int mdt_attr_get_lov(struct mdt_thread_info *info, /* no LOV EA */ rc = 0; } else if (rc == -ERANGE) { - rc = mdt_big_lmm_get(info->mti_env, o, ma); + rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV); + if (rc > 0) { + info->mti_big_lmm_used = 1; + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = rc; + /* update mdt_max_mdsize so all clients + * will be aware about that */ + if (info->mti_mdt->mdt_max_mdsize < rc) + info->mti_mdt->mdt_max_mdsize = rc; + rc = 0; + } } return rc; } +int mdt_attr_get_pfid(struct mdt_thread_info *info, + struct mdt_object *o, struct lu_fid *pfid) +{ + struct lu_buf *buf = &info->mti_buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int rc; + ENTRY; + + buf->lb_buf = info->mti_big_lmm; + buf->lb_len = info->mti_big_lmmsize; + rc = mo_xattr_get(info->mti_env, mdt_object_child(o), + buf, XATTR_NAME_LINK); + /* ignore errors, MA_PFID won't be set and it is + * up to the caller to treat this as an error */ + if (rc == -ERANGE || buf->lb_len == 0) { + rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); + buf->lb_buf = info->mti_big_lmm; + buf->lb_len = info->mti_big_lmmsize; + } + + if (rc < 0) + RETURN(rc); + if (rc < sizeof(*leh)) { + CERROR("short LinkEA on "DFID": rc = %d\n", + PFID(mdt_object_fid(o)), rc); + RETURN(-ENODATA); + } + + leh = (struct link_ea_header *) buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + } + if (leh->leh_magic != LINK_EA_MAGIC) + RETURN(-EINVAL); + if (leh->leh_reccount == 0) + RETURN(-ENODATA); + + memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid)); + fid_be_to_cpu(pfid, pfid); + + RETURN(0); +} + int mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -632,9 +580,6 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, int rc = 0, rc2; ENTRY; - /* do we really need PFID */ - LASSERT((ma->ma_need & MA_PFID) == 0); - ma->ma_valid = 0; if (need & MA_INODE) { @@ -645,6 +590,14 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, ma->ma_valid |= MA_INODE; } + if (need & MA_PFID) { + rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid); + if (rc == 0) + ma->ma_valid |= MA_PFID; + /* ignore this error, parent fid is not mandatory */ + rc = 0; + } + if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { rc = mdt_attr_get_lov(info, o, ma); if (rc) @@ -665,40 +618,30 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc = rc2); } + if (need & MA_SOM && S_ISREG(mode)) { + buf->lb_buf = info->mti_xattr_buf; + buf->lb_len = sizeof(info->mti_xattr_buf); + CLASSERT(sizeof(struct som_attrs) <= + sizeof(info->mti_xattr_buf)); + rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_SOM); + rc2 = lustre_buf2som(info->mti_xattr_buf, rc2, ma->ma_som); + if (rc2 == 0) + ma->ma_valid |= MA_SOM; + else if (rc2 < 0 && rc2 != -ENODATA) + GOTO(out, rc = rc2); + } - if (rc == 0 && S_ISREG(mode) && (need & (MA_HSM | MA_SOM))) { - struct lustre_mdt_attrs *lma; - - lma = (struct lustre_mdt_attrs *)info->mti_xattr_buf; - CLASSERT(sizeof(*lma) <= sizeof(info->mti_xattr_buf)); - - buf->lb_buf = lma; + if (need & MA_HSM && S_ISREG(mode)) { + buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); - rc = mo_xattr_get(env, next, buf, XATTR_NAME_LMA); - if (rc > 0) { - lustre_lma_swab(lma); - /* Swab and copy LMA */ - if (need & MA_HSM) { - if (lma->lma_compat & LMAC_HSM) - ma->ma_hsm.mh_flags = - lma->lma_flags & HSM_FLAGS_MASK; - else - ma->ma_hsm.mh_flags = 0; - ma->ma_valid |= MA_HSM; - } - /* Copy SOM */ - if (need & MA_SOM && lma->lma_compat & LMAC_SOM) { - LASSERT(ma->ma_som != NULL); - ma->ma_som->msd_ioepoch = lma->lma_ioepoch; - ma->ma_som->msd_size = lma->lma_som_size; - ma->ma_som->msd_blocks = lma->lma_som_blocks; - ma->ma_som->msd_mountid = lma->lma_som_mountid; - ma->ma_valid |= MA_SOM; - } - rc = 0; - } else if (rc == -ENODATA) { - rc = 0; - } + CLASSERT(sizeof(struct hsm_attrs) <= + sizeof(info->mti_xattr_buf)); + rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM); + rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm); + if (rc2 == 0) + ma->ma_valid |= MA_HSM; + else if (rc2 < 0 && rc2 != -ENODATA) + GOTO(out, rc = rc2); } #ifdef CONFIG_FS_POSIX_ACL @@ -975,7 +918,7 @@ static int mdt_renew_capa(struct mdt_thread_info *info) RETURN(rc); } -static int mdt_getattr(struct mdt_thread_info *info) +int mdt_getattr(struct mdt_thread_info *info) { struct mdt_object *obj = info->mti_object; struct req_capsule *pill = info->mti_pill; @@ -1023,7 +966,6 @@ static int mdt_getattr(struct mdt_thread_info *info) if (unlikely(rc)) GOTO(out_shrink, rc); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); /* @@ -1043,7 +985,7 @@ out_shrink: return rc; } -static int mdt_is_subdir(struct mdt_thread_info *info) +int mdt_is_subdir(struct mdt_thread_info *info) { struct mdt_object *o = info->mti_object; struct req_capsule *pill = info->mti_pill; @@ -1391,7 +1333,7 @@ out_parent: } /* normal handler: should release the child lock */ -static int mdt_getattr_name(struct mdt_thread_info *info) +int mdt_getattr_name(struct mdt_thread_info *info) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; struct mdt_body *reqbody; @@ -1404,7 +1346,6 @@ static int mdt_getattr_name(struct mdt_thread_info *info) repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody != NULL); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); repbody->eadatasize = 0; repbody->aclsize = 0; @@ -1428,17 +1369,10 @@ out_shrink: return rc; } -static const struct lu_device_operations mdt_lu_ops; - -static int lu_device_is_mdt(struct lu_device *d) -{ - return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); -} - static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg); -static int mdt_set_info(struct mdt_thread_info *info) +int mdt_set_info(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); char *key; @@ -1473,12 +1407,12 @@ static int mdt_set_info(struct mdt_thread_info *info) req->rq_status = 0; lustre_msg_set_status(req->rq_repmsg, 0); - cfs_spin_lock(&req->rq_export->exp_lock); - if (*(__u32 *)val) - req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY; - else - req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY; - cfs_spin_unlock(&req->rq_export->exp_lock); + spin_lock(&req->rq_export->exp_lock); + if (*(__u32 *)val) + req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY; + else + req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY; + spin_unlock(&req->rq_export->exp_lock); } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) { struct changelog_setinfo *cs = @@ -1502,28 +1436,46 @@ static int mdt_set_info(struct mdt_thread_info *info) RETURN(0); } -static int mdt_connect(struct mdt_thread_info *info) +/** + * Top-level handler for MDT connection requests. + */ +int mdt_connect(struct mdt_thread_info *info) { - int rc; - struct ptlrpc_request *req; + int rc; + struct obd_connect_data *reply; + struct obd_export *exp; + struct ptlrpc_request *req = mdt_info_req(info); - req = mdt_info_req(info); - rc = target_handle_connect(req); - if (rc == 0) { - LASSERT(req->rq_export != NULL); - info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - rc = mdt_init_sec_level(info); - if (rc == 0) - rc = mdt_init_idmap(info); - if (rc != 0) - obd_disconnect(class_export_get(req->rq_export)); - } else { - rc = err_serious(rc); - } - return rc; + rc = target_handle_connect(req); + if (rc != 0) + return err_serious(rc); + + LASSERT(req->rq_export != NULL); + info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); + rc = mdt_init_sec_level(info); + if (rc != 0) { + obd_disconnect(class_export_get(req->rq_export)); + return rc; + } + + /* To avoid exposing partially initialized connection flags, changes up + * to this point have been staged in reply->ocd_connect_flags. Now that + * connection handling has completed successfully, atomically update + * the connect flags in the shared export data structure. LU-1623 */ + reply = req_capsule_server_get(info->mti_pill, &RMF_CONNECT_DATA); + exp = req->rq_export; + spin_lock(&exp->exp_lock); + exp->exp_connect_flags = reply->ocd_connect_flags; + spin_unlock(&exp->exp_lock); + + rc = mdt_init_idmap(info); + if (rc != 0) + obd_disconnect(class_export_get(req->rq_export)); + + return rc; } -static int mdt_disconnect(struct mdt_thread_info *info) +int mdt_disconnect(struct mdt_thread_info *info) { int rc; ENTRY; @@ -1569,181 +1521,7 @@ static int mdt_sendpage(struct mdt_thread_info *info, RETURN(rc); } -#ifdef HAVE_SPLIT_SUPPORT -/* - * Retrieve dir entry from the page and insert it to the slave object, actually, - * this should be in osd layer, but since it will not in the final product, so - * just do it here and do not define more moo api anymore for this. - */ -static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page, - int size) -{ - struct mdt_object *object = info->mti_object; - struct lu_fid *lf = &info->mti_tmp_fid2; - struct md_attr *ma = &info->mti_attr; - struct lu_dirpage *dp; - struct lu_dirent *ent; - int rc = 0, offset = 0; - ENTRY; - - /* Make sure we have at least one entry. */ - if (size == 0) - RETURN(-EINVAL); - - /* - * Disable trans for this name insert, since it will include many trans - * for this. - */ - info->mti_no_need_trans = 1; - /* - * When write_dir_page, no need update parent's ctime, - * and no permission check for name_insert. - */ - ma->ma_attr.la_ctime = 0; - ma->ma_attr.la_valid = LA_MODE; - ma->ma_valid = MA_INODE; - - cfs_kmap(page); - dp = page_address(page); - offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp); - - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - struct lu_name *lname; - char *name; - - if (le16_to_cpu(ent->lde_namelen) == 0) - continue; - - fid_le_to_cpu(lf, &ent->lde_fid); - if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT) - ma->ma_attr.la_mode = S_IFDIR; - else - ma->ma_attr.la_mode = 0; - OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1); - if (name == NULL) - GOTO(out, rc = -ENOMEM); - - memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen)); - lname = mdt_name(info->mti_env, name, - le16_to_cpu(ent->lde_namelen)); - ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE); - rc = mdo_name_insert(info->mti_env, - md_object_next(&object->mot_obj), - lname, lf, ma); - OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1); - if (rc) { - CERROR("Can't insert %*.*s, rc %d\n", - le16_to_cpu(ent->lde_namelen), - le16_to_cpu(ent->lde_namelen), - ent->lde_name, rc); - GOTO(out, rc); - } - - offset += lu_dirent_size(ent); - if (offset >= size) - break; - } - EXIT; -out: - cfs_kunmap(page); - return rc; -} - -static int mdt_bulk_timeout(void *data) -{ - ENTRY; - - CERROR("mdt bulk transfer timeout \n"); - - RETURN(1); -} - -static int mdt_writepage(struct mdt_thread_info *info) -{ - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *reqbody; - struct l_wait_info *lwi; - struct ptlrpc_bulk_desc *desc; - struct page *page; - int rc; - ENTRY; - - - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - if (reqbody == NULL) - RETURN(err_serious(-EFAULT)); - - desc = ptlrpc_prep_bulk_exp(req, 1, BULK_GET_SINK, MDS_BULK_PORTAL); - if (desc == NULL) - RETURN(err_serious(-ENOMEM)); - - /* allocate the page for the desc */ - page = cfs_alloc_page(CFS_ALLOC_STD); - if (page == NULL) - GOTO(desc_cleanup, rc = -ENOMEM); - - CDEBUG(D_INFO, "Received page offset %d size %d \n", - (int)reqbody->size, (int)reqbody->nlink); - - ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size, - (int)reqbody->nlink); - - rc = sptlrpc_svc_prep_bulk(req, desc); - if (rc != 0) - GOTO(cleanup_page, rc); - /* - * Check if client was evicted while we were doing i/o before touching - * network. - */ - OBD_ALLOC_PTR(lwi); - if (!lwi) - GOTO(cleanup_page, rc = -ENOMEM); - - if (desc->bd_export->exp_failed) - rc = -ENOTCONN; - else - rc = ptlrpc_start_bulk_transfer (desc); - if (rc == 0) { - *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * CFS_HZ / 4, CFS_HZ, - mdt_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || - desc->bd_export->exp_failed, lwi); - LASSERT(rc == 0 || rc == -ETIMEDOUT); - if (rc == -ETIMEDOUT) { - DEBUG_REQ(D_ERROR, req, "timeout on bulk GET"); - ptlrpc_abort_bulk(desc); - } else if (desc->bd_export->exp_failed) { - DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET"); - rc = -ENOTCONN; - ptlrpc_abort_bulk(desc); - } else if (!desc->bd_success || - desc->bd_nob_transferred != desc->bd_nob) { - DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)", - desc->bd_success ? - "truncated" : "network error on", - desc->bd_nob_transferred, desc->bd_nob); - /* XXX should this be a different errno? */ - rc = -ETIMEDOUT; - } - } else { - DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc); - } - if (rc) - GOTO(cleanup_lwi, rc); - rc = mdt_write_dir_page(info, page, reqbody->nlink); - -cleanup_lwi: - OBD_FREE_PTR(lwi); -cleanup_page: - cfs_free_page(page); -desc_cleanup: - ptlrpc_free_bulk_pin(desc); - RETURN(rc); -} -#endif - -static int mdt_readpage(struct mdt_thread_info *info) +int mdt_readpage(struct mdt_thread_info *info) { struct mdt_object *object = info->mti_object; struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg; @@ -1902,7 +1680,7 @@ static long mdt_reint_opcode(struct mdt_thread_info *info, return opc; } -static int mdt_reint(struct mdt_thread_info *info) +int mdt_reint(struct mdt_thread_info *info) { long opc; int rc; @@ -1963,7 +1741,7 @@ static int mdt_object_sync(struct mdt_thread_info *info) RETURN(rc); } -static int mdt_sync(struct mdt_thread_info *info) +int mdt_sync(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); struct req_capsule *pill = info->mti_pill; @@ -2021,7 +1799,7 @@ static int mdt_sync(struct mdt_thread_info *info) * Quotacheck handler. * in-kernel quotacheck isn't supported any more. */ -static int mdt_quotacheck(struct mdt_thread_info *info) +int mdt_quotacheck(struct mdt_thread_info *info) { struct obd_quotactl *oqctl; int rc; @@ -2043,7 +1821,7 @@ static int mdt_quotacheck(struct mdt_thread_info *info) * Handle quota control requests to consult current usage/limit, but also * to configure quota enforcement */ -static int mdt_quotactl(struct mdt_thread_info *info) +int mdt_quotactl(struct mdt_thread_info *info) { struct obd_export *exp = info->mti_exp; struct req_capsule *pill = info->mti_pill; @@ -2150,7 +1928,7 @@ static int mdt_quotactl(struct mdt_thread_info *info) /* * OBD PING and other handlers. */ -static int mdt_obd_ping(struct mdt_thread_info *info) +int mdt_obd_ping(struct mdt_thread_info *info) { int rc; ENTRY; @@ -2166,7 +1944,7 @@ static int mdt_obd_ping(struct mdt_thread_info *info) /* * OBD_IDX_READ handler */ -static int mdt_obd_idx_read(struct mdt_thread_info *info) +int mdt_obd_idx_read(struct mdt_thread_info *info) { struct mdt_device *mdt = info->mti_mdt; struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg; @@ -2246,17 +2024,16 @@ out: return rc; } -static int mdt_obd_log_cancel(struct mdt_thread_info *info) +int mdt_obd_log_cancel(struct mdt_thread_info *info) { return err_serious(-EOPNOTSUPP); } -static int mdt_obd_qc_callback(struct mdt_thread_info *info) +int mdt_obd_qc_callback(struct mdt_thread_info *info) { return err_serious(-EOPNOTSUPP); } - /* * LLOG handlers. */ @@ -2305,7 +2082,7 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env, return 0; } -static int mdt_llog_create(struct mdt_thread_info *info) +int mdt_llog_create(struct mdt_thread_info *info) { int rc; @@ -2314,7 +2091,7 @@ static int mdt_llog_create(struct mdt_thread_info *info) return (rc < 0 ? err_serious(rc) : rc); } -static int mdt_llog_destroy(struct mdt_thread_info *info) +int mdt_llog_destroy(struct mdt_thread_info *info) { int rc; @@ -2323,7 +2100,7 @@ static int mdt_llog_destroy(struct mdt_thread_info *info) return (rc < 0 ? err_serious(rc) : rc); } -static int mdt_llog_read_header(struct mdt_thread_info *info) +int mdt_llog_read_header(struct mdt_thread_info *info) { int rc; @@ -2332,7 +2109,7 @@ static int mdt_llog_read_header(struct mdt_thread_info *info) return (rc < 0 ? err_serious(rc) : rc); } -static int mdt_llog_next_block(struct mdt_thread_info *info) +int mdt_llog_next_block(struct mdt_thread_info *info) { int rc; @@ -2341,7 +2118,7 @@ static int mdt_llog_next_block(struct mdt_thread_info *info) return (rc < 0 ? err_serious(rc) : rc); } -static int mdt_llog_prev_block(struct mdt_thread_info *info) +int mdt_llog_prev_block(struct mdt_thread_info *info) { int rc; @@ -2360,7 +2137,7 @@ static struct ldlm_callback_suite cbs = { .lcs_glimpse = ldlm_server_glimpse_ast }; -static int mdt_enqueue(struct mdt_thread_info *info) +int mdt_enqueue(struct mdt_thread_info *info) { struct ptlrpc_request *req; int rc; @@ -2378,7 +2155,7 @@ static int mdt_enqueue(struct mdt_thread_info *info) return rc ? err_serious(rc) : req->rq_status; } -static int mdt_convert(struct mdt_thread_info *info) +int mdt_convert(struct mdt_thread_info *info) { int rc; struct ptlrpc_request *req; @@ -2389,14 +2166,14 @@ static int mdt_convert(struct mdt_thread_info *info) return rc ? err_serious(rc) : req->rq_status; } -static int mdt_bl_callback(struct mdt_thread_info *info) +int mdt_bl_callback(struct mdt_thread_info *info) { CERROR("bl callbacks should not happen on MDS\n"); LBUG(); return err_serious(-EOPNOTSUPP); } -static int mdt_cp_callback(struct mdt_thread_info *info) +int mdt_cp_callback(struct mdt_thread_info *info) { CERROR("cp callbacks should not happen on MDS\n"); LBUG(); @@ -2406,7 +2183,7 @@ static int mdt_cp_callback(struct mdt_thread_info *info) /* * sec context handlers */ -static int mdt_sec_ctx_handle(struct mdt_thread_info *info) +int mdt_sec_ctx_handle(struct mdt_thread_info *info) { int rc; @@ -2429,7 +2206,7 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info) /* * quota request handlers */ -static int mdt_quota_dqacq(struct mdt_thread_info *info) +int mdt_quota_dqacq(struct mdt_thread_info *info) { struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev; int rc; @@ -3082,7 +2859,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, struct mdt_thread_info *info) { int i; - struct md_capainfo *ci; req_capsule_init(&req->rq_pill, req, RCL_SERVER); info->mti_pill = &req->rq_pill; @@ -3098,18 +2874,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, } else info->mti_mdt = NULL; info->mti_env = req->rq_svc_thread->t_env; - ci = md_capainfo(info->mti_env); - memset(ci, 0, sizeof *ci); - if (req->rq_export) { - if (exp_connect_rmtclient(req->rq_export)) - ci->mc_auth = LC_ID_CONVERT; - else if (req->rq_export->exp_connect_flags & - OBD_CONNECT_MDS_CAPA) - ci->mc_auth = LC_ID_PLAIN; - else - ci->mc_auth = LC_ID_NONE; - } - info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET; info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg); info->mti_mos = NULL; @@ -3124,7 +2888,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_big_lmm_used = 0; /* To not check for split by default. */ - info->mti_spec.sp_ck_split = 0; info->mti_spec.no_create = 0; } @@ -3150,6 +2913,7 @@ static int mdt_filter_recovery_request(struct ptlrpc_request *req, case OST_CONNECT: /* This will never get here, but for completeness. */ case MDS_DISCONNECT: case OST_DISCONNECT: + case OBD_IDX_READ: *process = 1; RETURN(0); @@ -3380,8 +3144,8 @@ static int mdt_handle0(struct ptlrpc_request *req, * XXX common "target" functionality should be factored into separate module * shared by mdt, ost and stand-alone services like fld. */ -static int mdt_handle_common(struct ptlrpc_request *req, - struct mdt_opc_slice *supported) +int mdt_handle_common(struct ptlrpc_request *req, + struct mdt_opc_slice *supported) { struct lu_env *env; struct mdt_thread_info *info; @@ -3389,6 +3153,11 @@ static int mdt_handle_common(struct ptlrpc_request *req, ENTRY; env = req->rq_svc_thread->t_env; + /* Refill(initilize) the context(mdt_thread_info), in case it is + * not initialized yet. Usually it happens during start up, after + * MDS(ptlrpc threads) is start up, it gets the first CONNECT request, + * before MDT_thread_info is initialized */ + lu_env_refill(env); LASSERT(env != NULL); LASSERT(env->le_ses != NULL); LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread); @@ -3427,41 +3196,6 @@ int mdt_recovery_handle(struct ptlrpc_request *req) RETURN(rc); } -static int mdt_regular_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_regular_handlers); -} - -static int mdt_readpage_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_readpage_handlers); -} - -static int mdt_xmds_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_xmds_handlers); -} - -static int mdt_mdsc_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_seq_handlers); -} - -static int mdt_mdss_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_seq_handlers); -} - -static int mdt_dtss_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_seq_handlers); -} - -static int mdt_fld_handle(struct ptlrpc_request *req) -{ - return mdt_handle_common(req, mdt_fld_handlers); -} - enum mdt_it_code { MDT_IT_OPEN, MDT_IT_OCREAT, @@ -3480,11 +3214,11 @@ enum mdt_it_code { static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, - int); + __u64); static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, - int); + __u64); static struct mdt_it_flavor { const struct req_format *it_fmt; @@ -3492,7 +3226,7 @@ static struct mdt_it_flavor { int (*it_act)(enum mdt_it_code , struct mdt_thread_info *, struct ldlm_lock **, - int); + __u64); long it_reint; } mdt_it_flavor[] = { [MDT_IT_OPEN] = { @@ -3556,7 +3290,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp, struct ldlm_lock *new_lock, struct mdt_lock_handle *lh, - int flags) + __u64 flags) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_lock *lock = *lockp; @@ -3697,7 +3431,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, - int flags) + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_lock *new_lock = NULL; @@ -3715,7 +3449,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody); - info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT); info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF); repbody->eadatasize = 0; repbody->aclsize = 0; @@ -3779,7 +3512,7 @@ out_shrink: static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, - int flags) + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *rep = NULL; @@ -3911,7 +3644,7 @@ static int mdt_intent_code(long itcode) } static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, - struct ldlm_lock **lockp, int flags) + struct ldlm_lock **lockp, __u64 flags) { struct req_capsule *pill; struct mdt_it_flavor *flv; @@ -3931,6 +3664,7 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, if (qmt == NULL) RETURN(-EOPNOTSUPP); + (*lockp)->l_lvb_type = LVB_T_LQUOTA; /* pass the request to quota master */ rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt, mdt_info_req(info), lockp, @@ -3938,6 +3672,14 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, RETURN(rc); } + if (opc == MDT_IT_LAYOUT) { + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT + * lock enabled. */ + } else if (opc == MDT_IT_READDIR) { + req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); + } + flv = &mdt_it_flavor[opc]; if (flv->it_fmt != NULL) req_capsule_extend(pill, flv->it_fmt); @@ -3960,7 +3702,7 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, static int mdt_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data) + ldlm_mode_t mode, __u64 flags, void *data) { struct mdt_thread_info *info; struct ptlrpc_request *req = req_cookie; @@ -4000,6 +3742,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, } else { /* No intent was provided */ LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE); + req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); rc = req_capsule_server_pack(pill); if (rc) rc = err_serious(rc); @@ -4235,7 +3978,7 @@ static int mdt_fld_fini(const struct lu_env *env, ENTRY; if (ms && ms->ms_server_fld) { - fld_server_fini(ms->ms_server_fld, env); + fld_server_fini(env, ms->ms_server_fld); OBD_FREE_PTR(ms->ms_server_fld); ms->ms_server_fld = NULL; } @@ -4257,9 +4000,8 @@ static int mdt_fld_init(const struct lu_env *env, if (ms->ms_server_fld == NULL) RETURN(rc = -ENOMEM); - rc = fld_server_init(ms->ms_server_fld, - m->mdt_bottom, uuid, - env, ms->ms_node_id); + rc = fld_server_init(env, ms->ms_server_fld, m->mdt_bottom, uuid, + ms->ms_node_id); if (rc) { OBD_FREE_PTR(ms->ms_server_fld); ms->ms_server_fld = NULL; @@ -4269,381 +4011,6 @@ static int mdt_fld_init(const struct lu_env *env, RETURN(0); } -/* device init/fini methods */ -static void mdt_stop_ptlrpc_service(struct mdt_device *m) -{ - ENTRY; - if (m->mdt_regular_service != NULL) { - ptlrpc_unregister_service(m->mdt_regular_service); - m->mdt_regular_service = NULL; - } - if (m->mdt_readpage_service != NULL) { - ptlrpc_unregister_service(m->mdt_readpage_service); - m->mdt_readpage_service = NULL; - } - if (m->mdt_xmds_service != NULL) { - ptlrpc_unregister_service(m->mdt_xmds_service); - m->mdt_xmds_service = NULL; - } - if (m->mdt_setattr_service != NULL) { - ptlrpc_unregister_service(m->mdt_setattr_service); - m->mdt_setattr_service = NULL; - } - if (m->mdt_mdsc_service != NULL) { - ptlrpc_unregister_service(m->mdt_mdsc_service); - m->mdt_mdsc_service = NULL; - } - if (m->mdt_mdss_service != NULL) { - ptlrpc_unregister_service(m->mdt_mdss_service); - m->mdt_mdss_service = NULL; - } - if (m->mdt_dtss_service != NULL) { - ptlrpc_unregister_service(m->mdt_dtss_service); - m->mdt_dtss_service = NULL; - } - if (m->mdt_fld_service != NULL) { - ptlrpc_unregister_service(m->mdt_fld_service); - m->mdt_fld_service = NULL; - } - EXIT; -} - -static int mdt_start_ptlrpc_service(struct mdt_device *m) -{ - static struct ptlrpc_service_conf conf; - cfs_proc_dir_entry_t *procfs_entry; - int rc = 0; - ENTRY; - - m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client; - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, - "mdt_ldlm_client", m->mdt_ldlm_client); - - procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry; - - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME, - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = MDS_MAXREQSIZE, - .bc_rep_max_size = MDS_MAXREPSIZE, - .bc_req_portal = MDS_REQUEST_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - /* - * We'd like to have a mechanism to set this on a per-device - * basis, but alas... - */ - .psc_thr = { - .tc_thr_name = LUSTRE_MDT_NAME, - .tc_thr_factor = MDT_THR_FACTOR, - .tc_nthrs_init = MDT_NTHRS_INIT, - .tc_nthrs_base = MDT_NTHRS_BASE, - .tc_nthrs_max = MDT_NTHRS_MAX, - .tc_nthrs_user = mds_num_threads, - .tc_cpu_affinity = 1, - .tc_ctx_tags = LCT_MD_THREAD, - }, - .psc_cpt = { - .cc_pattern = mds_num_cpts, - }, - .psc_ops = { - .so_req_handler = mdt_regular_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = ptlrpc_hpreq_handler, - }, - }; - m->mdt_regular_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_regular_service)) { - rc = PTR_ERR(m->mdt_regular_service); - CERROR("failed to start regular mdt service: %d\n", rc); - m->mdt_regular_service = NULL; - - RETURN(rc); - } - - /* - * readpage service configuration. Parameters have to be adjusted, - * ideally. - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_readpage", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = MDS_MAXREQSIZE, - .bc_rep_max_size = MDS_MAXREPSIZE, - .bc_req_portal = MDS_READPAGE_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_rdpg", - .tc_thr_factor = MDT_RDPG_THR_FACTOR, - .tc_nthrs_init = MDT_RDPG_NTHRS_INIT, - .tc_nthrs_base = MDT_RDPG_NTHRS_BASE, - .tc_nthrs_max = MDT_RDPG_NTHRS_MAX, - .tc_nthrs_user = mds_rdpg_num_threads, - .tc_cpu_affinity = 1, - .tc_ctx_tags = LCT_MD_THREAD, - }, - .psc_cpt = { - .cc_pattern = mds_rdpg_num_cpts, - }, - .psc_ops = { - .so_req_handler = mdt_readpage_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_readpage_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_readpage_service)) { - rc = PTR_ERR(m->mdt_readpage_service); - CERROR("failed to start readpage service: %d\n", rc); - m->mdt_readpage_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* - * setattr service configuration. - * - * XXX To keep the compatibility with old client(< 2.2), we need to - * preserve this portal for a certain time, it should be removed - * eventually. LU-617. - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_setattr", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = MDS_MAXREQSIZE, - .bc_rep_max_size = MDS_MAXREPSIZE, - .bc_req_portal = MDS_SETATTR_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_attr", - .tc_thr_factor = MDT_SETA_THR_FACTOR, - .tc_nthrs_init = MDT_SETA_NTHRS_INIT, - .tc_nthrs_base = MDT_SETA_NTHRS_BASE, - .tc_nthrs_max = MDT_SETA_NTHRS_MAX, - .tc_nthrs_user = mds_attr_num_threads, - .tc_cpu_affinity = 1, - .tc_ctx_tags = LCT_MD_THREAD, - }, - .psc_cpt = { - .cc_pattern = mds_attr_num_cpts, - }, - .psc_ops = { - .so_req_handler = mdt_regular_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_setattr_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_setattr_service)) { - rc = PTR_ERR(m->mdt_setattr_service); - CERROR("failed to start setattr service: %d\n", rc); - m->mdt_setattr_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* - * sequence controller service configuration - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_mdsc", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = SEQ_MAXREQSIZE, - .bc_rep_max_size = SEQ_MAXREPSIZE, - .bc_req_portal = SEQ_CONTROLLER_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_mdsc", - .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, - .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, - .tc_ctx_tags = LCT_MD_THREAD, - }, - .psc_ops = { - .so_req_handler = mdt_mdsc_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_mdsc_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_mdsc_service)) { - rc = PTR_ERR(m->mdt_mdsc_service); - CERROR("failed to start seq controller service: %d\n", rc); - m->mdt_mdsc_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* - * metadata sequence server service configuration - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_mdss", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = SEQ_MAXREQSIZE, - .bc_rep_max_size = SEQ_MAXREPSIZE, - .bc_req_portal = SEQ_METADATA_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_mdss", - .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, - .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, - .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD - }, - .psc_ops = { - .so_req_handler = mdt_mdss_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_mdss_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_mdss_service)) { - rc = PTR_ERR(m->mdt_mdss_service); - CERROR("failed to start metadata seq server service: %d\n", rc); - m->mdt_mdss_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* - * Data sequence server service configuration. We want to have really - * cluster-wide sequences space. This is why we start only one sequence - * controller which manages space. - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_dtss", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = SEQ_MAXREQSIZE, - .bc_rep_max_size = SEQ_MAXREPSIZE, - .bc_req_portal = SEQ_DATA_PORTAL, - .bc_rep_portal = OSC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_dtss", - .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, - .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, - .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD - }, - .psc_ops = { - .so_req_handler = mdt_dtss_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_dtss_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_dtss_service)) { - rc = PTR_ERR(m->mdt_dtss_service); - CERROR("failed to start data seq server service: %d\n", rc); - m->mdt_dtss_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* FLD service start */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_fld", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = FLD_MAXREQSIZE, - .bc_rep_max_size = FLD_MAXREPSIZE, - .bc_req_portal = FLD_REQUEST_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_fld", - .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, - .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, - .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD - }, - .psc_ops = { - .so_req_handler = mdt_fld_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = NULL, - }, - }; - m->mdt_fld_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_fld_service)) { - rc = PTR_ERR(m->mdt_fld_service); - CERROR("failed to start fld service: %d\n", rc); - m->mdt_fld_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - /* - * mds-mds service configuration. Separate portal is used to allow - * mds-mds requests be not blocked during recovery. - */ - memset(&conf, 0, sizeof(conf)); - conf = (typeof(conf)) { - .psc_name = LUSTRE_MDT_NAME "_mds", - .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, - .psc_buf = { - .bc_nbufs = MDS_NBUFS, - .bc_buf_size = MDS_BUFSIZE, - .bc_req_max_size = MDS_MAXREQSIZE, - .bc_rep_max_size = MDS_MAXREPSIZE, - .bc_req_portal = MDS_MDS_PORTAL, - .bc_rep_portal = MDC_REPLY_PORTAL, - }, - .psc_thr = { - .tc_thr_name = "mdt_mds", - .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, - .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, - .tc_ctx_tags = LCT_MD_THREAD, - }, - .psc_ops = { - .so_req_handler = mdt_xmds_handle, - .so_req_printer = target_print_req, - .so_hpreq_handler = ptlrpc_hpreq_handler, - }, - }; - m->mdt_xmds_service = ptlrpc_register_service(&conf, procfs_entry); - if (IS_ERR(m->mdt_xmds_service)) { - rc = PTR_ERR(m->mdt_xmds_service); - CERROR("failed to start xmds service: %d\n", rc); - m->mdt_xmds_service = NULL; - - GOTO(err_mdt_svc, rc); - } - - EXIT; -err_mdt_svc: - if (rc) - mdt_stop_ptlrpc_service(m); - - return rc; -} - static void mdt_stack_fini(const struct lu_env *env, struct mdt_device *m, struct lu_device *top) { @@ -5028,14 +4395,13 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) ping_evictor_stop(); - mdt_stop_ptlrpc_service(m); mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT); obd_exports_barrier(obd); obd_zombie_barrier(); mdt_procfs_fini(m); - lut_fini(env, &m->mdt_lut); + tgt_fini(env, &m->mdt_lut); mdt_fs_cleanup(env, m); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; @@ -5092,10 +4458,10 @@ static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial) sptlrpc_target_update_exp_flavor(obd, &tmp_rset); - cfs_write_lock(&m->mdt_sptlrpc_lock); + write_lock(&m->mdt_sptlrpc_lock); sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset); m->mdt_sptlrpc_rset = tmp_rset; - cfs_write_unlock(&m->mdt_sptlrpc_lock); + write_unlock(&m->mdt_sptlrpc_lock); return 0; } @@ -5153,18 +4519,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* CMD is supported only in IAM mode */ LASSERT(num); node_id = simple_strtol(num, NULL, 10); - if (!(lsi->lsi_flags & LDD_F_IAM_DIR) && node_id) { - CERROR("CMD Operation not allowed in IOP mode\n"); - GOTO(err_lmi, rc = -EINVAL); - } - - obd->u.obt.obt_magic = OBT_MAGIC; + obd->u.obt.obt_magic = OBT_MAGIC; } - cfs_rwlock_init(&m->mdt_sptlrpc_lock); + rwlock_init(&m->mdt_sptlrpc_lock); sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset); - cfs_spin_lock_init(&m->mdt_ioepoch_lock); + spin_lock_init(&m->mdt_ioepoch_lock); m->mdt_opts.mo_compat_resname = 0; m->mdt_opts.mo_mds_capa = 1; m->mdt_opts.mo_oss_capa = 1; @@ -5176,8 +4537,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids); m->mdt_nosquash_str = NULL; m->mdt_nosquash_strlen = 0; - cfs_init_rwsem(&m->mdt_squash_sem); - cfs_spin_lock_init(&m->mdt_osfs_lock); + init_rwsem(&m->mdt_squash_sem); + spin_lock_init(&m->mdt_osfs_lock); m->mdt_osfs_age = cfs_time_shift_64(-1000); m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; @@ -5189,12 +4550,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, rc = mdt_stack_init((struct lu_env *)env, m, cfg); if (rc) { CERROR("Can't init device stack, rc %d\n", rc); - RETURN(rc); + GOTO(err_lmi, rc); } s = m->mdt_md_dev.md_lu_dev.ld_site; mite = &m->mdt_mite; s->ld_md_site = mite; + mite->ms_lu = s; /* set server index */ mite->ms_node_id = node_id; @@ -5215,7 +4577,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } } - rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom); + rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom); if (rc) GOTO(err_fini_stack, rc); @@ -5286,9 +4648,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_procfs, rc); - rc = mdt_start_ptlrpc_service(m); - if (rc) - GOTO(err_quota, rc); + m->mdt_ldlm_client = &mdt2obd_dev(m)->obd_ldlm_client; + ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, + "mdt_ldlm_client", m->mdt_ldlm_client); ping_evictor_start(); @@ -5306,10 +4668,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, RETURN(0); - ping_evictor_stop(); - mdt_stop_ptlrpc_service(m); -err_quota: - mdt_quota_fini(env, m); err_procfs: mdt_procfs_fini(m); err_recovery: @@ -5328,7 +4686,7 @@ err_free_ns: err_fini_seq: mdt_seq_fini(env, m); mdt_fld_fini(env, m); - lut_fini(env, &m->mdt_lut); + tgt_fini(env, &m->mdt_lut); err_fini_stack: mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); err_lmi: @@ -5337,10 +4695,13 @@ err_lmi: return (rc); } -/* For interoperability between 1.8 and 2.0. */ +/* For interoperability, the left element is old parameter, the right one + * is the new version of the parameter, if some parameter is deprecated, + * the new version should be set as NULL. */ static struct cfg_interop_param mdt_interop_param[] = { { "mdt.group_upcall", NULL }, - { "mdt.quota_type", "mdd.quota_type" }, + { "mdt.quota_type", NULL }, + { "mdd.quota_type", NULL }, { "mdt.rootsquash", "mdt.root_squash" }, { "mdt.nosquash_nid", "mdt.nosquash_nids" }, { NULL } @@ -5353,7 +4714,7 @@ static int mdt_process_config(const struct lu_env *env, struct mdt_device *m = mdt_dev(d); struct md_device *md_next = m->mdt_child; struct lu_device *next = md2lu_dev(md_next); - int rc = 0; + int rc; ENTRY; switch (cfg->lcfg_command) { @@ -5361,7 +4722,7 @@ static int mdt_process_config(const struct lu_env *env, struct lprocfs_static_vars lvars; struct obd_device *obd = d->ld_obd; - /* For interoperability between 1.8 and 2.0. */ + /* For interoperability */ struct cfg_interop_param *ptr = NULL; struct lustre_cfg *old_cfg = NULL; char *param = NULL; @@ -5376,9 +4737,10 @@ static int mdt_process_config(const struct lu_env *env, ptr = class_find_old_param(param, mdt_interop_param); if (ptr != NULL) { if (ptr->new_param == NULL) { - CWARN("For 1.8 interoperability, skip this %s." + rc = 0; + CWARN("For interoperability, skip this %s." " It is obsolete.\n", ptr->old_param); - break; + break; } CWARN("Found old param %s, changed it to %s.\n", @@ -5442,8 +4804,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, lu_object_init(o, h, d); lu_object_add_top(h, o); o->lo_ops = &mdt_obj_ops; - cfs_mutex_init(&mo->mot_ioepoch_mutex); - cfs_mutex_init(&mo->mot_lov_mutex); + mutex_init(&mo->mot_ioepoch_mutex); + mutex_init(&mo->mot_lov_mutex); RETURN(o); } else RETURN(NULL); @@ -5532,13 +4894,13 @@ static int mdt_prepare(const struct lu_env *env, if (rc) RETURN(rc); - LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &mdt->mdt_state)); + LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state)); target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle); - cfs_set_bit(MDT_FL_CFGLOG, &mdt->mdt_state); + set_bit(MDT_FL_CFGLOG, &mdt->mdt_state); LASSERT(obd->obd_no_conn); - cfs_spin_lock(&obd->obd_dev_lock); + spin_lock(&obd->obd_dev_lock); obd->obd_no_conn = 0; - cfs_spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_dev_lock); if (obd->obd_recovering == 0) mdt_postrecov(env, mdt); @@ -5546,7 +4908,7 @@ static int mdt_prepare(const struct lu_env *env, RETURN(rc); } -static const struct lu_device_operations mdt_lu_ops = { +const struct lu_device_operations mdt_lu_ops = { .ldo_object_alloc = mdt_object_alloc, .ldo_process_config = mdt_process_config, .ldo_prepare = mdt_prepare, @@ -5578,97 +4940,115 @@ static int mdt_obd_set_info_async(const struct lu_env *env, RETURN(0); } -/* mds_connect_internal */ +/** + * Match client and server connection feature flags. + * + * Compute the compatibility flags for a connection request based on + * features mutually supported by client and server. + * + * The obd_export::exp_connect_flags field in \a exp must not be updated + * here, otherwise a partially initialized value may be exposed. After + * the connection request is successfully processed, the top-level MDT + * connect request handler atomically updates the export connect flags + * from the obd_connect_data::ocd_connect_flags field of the reply. + * \see mdt_connect(). + * + * \param exp the obd_export associated with this client/target pair + * \param mdt the target device for the connection + * \param data stores data for this connect request + * + * \retval 0 success + * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size + * \retval -EBADE client and server feature requirements are incompatible + */ static int mdt_connect_internal(struct obd_export *exp, - struct mdt_device *mdt, - struct obd_connect_data *data) -{ - if (data != NULL) { - data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; - data->ocd_ibits_known &= MDS_INODELOCK_FULL; - - /* If no known bits (which should not happen, probably, - as everybody should support LOOKUP and UPDATE bits at least) - revert to compat mode with plain locks. */ - if (!data->ocd_ibits_known && - data->ocd_connect_flags & OBD_CONNECT_IBITS) - data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; - - if (!mdt->mdt_opts.mo_acl) - data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - - if (!mdt->mdt_opts.mo_user_xattr) - data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; - - if (!mdt->mdt_som_conf) - data->ocd_connect_flags &= ~OBD_CONNECT_SOM; - - if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { - data->ocd_brw_size = min(data->ocd_brw_size, - (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); - if (data->ocd_brw_size == 0) { - CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 - " ocd_version: %x ocd_grant: %d " - "ocd_index: %u ocd_brw_size is " - "unexpectedly zero, network data " - "corruption? Refusing connection of this" - " client\n", - exp->exp_obd->obd_name, - exp->exp_client_uuid.uuid, - exp, data->ocd_connect_flags, data->ocd_version, - data->ocd_grant, data->ocd_index); - return -EPROTO; - } - } + struct mdt_device *mdt, + struct obd_connect_data *data) +{ + LASSERT(data != NULL); + + data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; + data->ocd_ibits_known &= MDS_INODELOCK_FULL; + + /* If no known bits (which should not happen, probably, + as everybody should support LOOKUP and UPDATE bits at least) + revert to compat mode with plain locks. */ + if (!data->ocd_ibits_known && + data->ocd_connect_flags & OBD_CONNECT_IBITS) + data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; + + if (!mdt->mdt_opts.mo_acl) + data->ocd_connect_flags &= ~OBD_CONNECT_ACL; + + if (!mdt->mdt_opts.mo_user_xattr) + data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; + + if (!mdt->mdt_som_conf) + data->ocd_connect_flags &= ~OBD_CONNECT_SOM; + + if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { + data->ocd_brw_size = min(data->ocd_brw_size, + (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + if (data->ocd_brw_size == 0) { + CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 + " ocd_version: %x ocd_grant: %d " + "ocd_index: %u ocd_brw_size is " + "unexpectedly zero, network data " + "corruption? Refusing connection of this" + " client\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, + exp, data->ocd_connect_flags, data->ocd_version, + data->ocd_grant, data->ocd_index); + return -EPROTO; + } + } - cfs_spin_lock(&exp->exp_lock); - exp->exp_connect_flags = data->ocd_connect_flags; - cfs_spin_unlock(&exp->exp_lock); - data->ocd_version = LUSTRE_VERSION_CODE; - exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known; - } + /* NB: Disregard the rule against updating exp_connect_flags in this + * case, since tgt_client_new() needs to know if this is a lightweight + * connection, and it is safe to expose this flag before connection + * processing completes. */ + if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) { + spin_lock(&exp->exp_lock); + exp->exp_connect_flags |= OBD_CONNECT_LIGHTWEIGHT; + spin_unlock(&exp->exp_lock); + } -#if 0 - if (mdt->mdt_opts.mo_acl && - ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { - CWARN("%s: MDS requires ACL support but client does not\n", - mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); - return -EBADE; - } -#endif + data->ocd_version = LUSTRE_VERSION_CODE; + exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known; - if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) { - CWARN("%s: MDS requires FID support, but client not\n", - mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); - return -EBADE; - } + if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) { + CWARN("%s: MDS requires FID support, but client not\n", + mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); + return -EBADE; + } - if (mdt->mdt_som_conf && !exp_connect_som(exp) && - !(exp->exp_connect_flags & OBD_CONNECT_MDS_MDS)) { - CWARN("%s: MDS has SOM enabled, but client does not support " - "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); - return -EBADE; - } + if (mdt->mdt_som_conf && + !(data->ocd_connect_flags & (OBD_CONNECT_MDS_MDS|OBD_CONNECT_SOM))){ + CWARN("%s: MDS has SOM enabled, but client does not support " + "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); + return -EBADE; + } - return 0; + return 0; } static int mdt_connect_check_sptlrpc(struct mdt_device *mdt, - struct obd_export *exp, - struct ptlrpc_request *req) + struct obd_export *exp, + struct ptlrpc_request *req) { - struct sptlrpc_flavor flvr; - int rc = 0; + struct sptlrpc_flavor flvr; + int rc = 0; - if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) { - cfs_read_lock(&mdt->mdt_sptlrpc_lock); - sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset, - req->rq_sp_from, - req->rq_peer.nid, - &flvr); - cfs_read_unlock(&mdt->mdt_sptlrpc_lock); + if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) { + read_lock(&mdt->mdt_sptlrpc_lock); + sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset, + req->rq_sp_from, + req->rq_peer.nid, + &flvr); + read_unlock(&mdt->mdt_sptlrpc_lock); - cfs_spin_lock(&exp->exp_lock); + spin_lock(&exp->exp_lock); exp->exp_sp_peer = req->rq_sp_from; exp->exp_flvr = flvr; @@ -5682,7 +5062,7 @@ static int mdt_connect_check_sptlrpc(struct mdt_device *mdt, rc = -EACCES; } - cfs_spin_unlock(&exp->exp_lock); + spin_unlock(&exp->exp_lock); } else { if (exp->exp_sp_peer != req->rq_sp_from) { CERROR("RPC source %s doesn't match %s\n", @@ -5725,11 +5105,11 @@ static int mdt_obd_connect(const struct lu_env *env, * XXX: probably not very appropriate method is used now * at some point we should find a better one */ - if (!cfs_test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) { + if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) { rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd); if (rc) RETURN(-EAGAIN); - cfs_set_bit(MDT_FL_SYNCED, &mdt->mdt_state); + set_bit(MDT_FL_SYNCED, &mdt->mdt_state); } rc = class_connect(&conn, obd, cluuid); @@ -5753,7 +5133,7 @@ static int mdt_obd_connect(const struct lu_env *env, LASSERT(lcd); info->mti_exp = lexp; memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); - rc = lut_client_new(env, lexp); + rc = tgt_client_new(env, lexp); if (rc == 0) mdt_export_stats_init(obd, lexp, localdata); } @@ -5811,17 +5191,17 @@ static int mdt_export_cleanup(struct obd_export *exp) int rc = 0; ENTRY; - cfs_spin_lock(&med->med_open_lock); - while (!cfs_list_empty(&med->med_open_head)) { - cfs_list_t *tmp = med->med_open_head.next; - mfd = cfs_list_entry(tmp, struct mdt_file_data, mfd_list); + spin_lock(&med->med_open_lock); + while (!cfs_list_empty(&med->med_open_head)) { + cfs_list_t *tmp = med->med_open_head.next; + mfd = cfs_list_entry(tmp, struct mdt_file_data, mfd_list); - /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); - cfs_list_move_tail(&mfd->mfd_list, &closing_list); - } - cfs_spin_unlock(&med->med_open_lock); + /* Remove mfd handle so it can't be found again. + * We are consuming the mfd_list reference here. */ + class_handle_unhash(&mfd->mfd_handle); + cfs_list_move_tail(&mfd->mfd_list, &closing_list); + } + spin_unlock(&med->med_open_lock); mdt = mdt_dev(obd->obd_lu_dev); LASSERT(mdt != NULL); @@ -5855,7 +5235,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* cleanup client slot early */ /* Do not erase record for recoverable client. */ if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) - lut_client_del(&env, exp); + tgt_client_del(&env, exp); lu_env_fini(&env); RETURN(rc); @@ -5886,19 +5266,19 @@ static int mdt_init_export(struct obd_export *exp) ENTRY; CFS_INIT_LIST_HEAD(&med->med_open_head); - cfs_spin_lock_init(&med->med_open_lock); - cfs_mutex_init(&med->med_idmap_mutex); - med->med_idmap = NULL; - cfs_spin_lock(&exp->exp_lock); - exp->exp_connecting = 1; - cfs_spin_unlock(&exp->exp_lock); + spin_lock_init(&med->med_open_lock); + mutex_init(&med->med_idmap_mutex); + med->med_idmap = NULL; + spin_lock(&exp->exp_lock); + exp->exp_connecting = 1; + spin_unlock(&exp->exp_lock); /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) RETURN(0); - rc = lut_client_alloc(exp); + rc = tgt_client_alloc(exp); if (rc) GOTO(err, rc); @@ -5909,7 +5289,7 @@ static int mdt_init_export(struct obd_export *exp) RETURN(rc); err_free: - lut_client_free(exp); + tgt_client_free(exp); err: CERROR("%s: Failed to initialize export: rc = %d\n", exp->exp_obd->obd_name, rc); @@ -5931,7 +5311,7 @@ static int mdt_destroy_export(struct obd_export *exp) RETURN(0); ldlm_destroy_export(exp); - lut_client_free(exp); + tgt_client_free(exp); LASSERT(cfs_list_empty(&exp->exp_outstanding_replies)); LASSERT(cfs_list_empty(&exp->exp_mdt_data.med_open_head)); @@ -5961,45 +5341,53 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, } static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt, - struct getinfo_fid2path *fp) + struct getinfo_fid2path *fp) { - struct mdt_object *obj; - int rc; - ENTRY; + struct mdt_object *obj; + struct obd_device *obd = mdt2obd_dev(mdt); + int rc; + ENTRY; - CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n", - PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno); + CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n", + PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno); - if (!fid_is_sane(&fp->gf_fid)) - RETURN(-EINVAL); + if (!fid_is_sane(&fp->gf_fid)) + RETURN(-EINVAL); - obj = mdt_object_find(env, mdt, &fp->gf_fid); - if (obj == NULL || IS_ERR(obj)) { - CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid), - PTR_ERR(obj)); - RETURN(-EINVAL); - } + if (!fid_is_norm(&fp->gf_fid) && !fid_is_igif(&fp->gf_fid)) { + CWARN("%s: "DFID" is invalid, sequence should be " + ">= "LPX64"\n", obd->obd_name, + PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL); + RETURN(-EINVAL); + } - rc = lu_object_exists(&obj->mot_obj.mo_lu); - if (rc <= 0) { - if (rc == -1) - rc = -EREMOTE; - else - rc = -ENOENT; - mdt_object_put(env, obj); - CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n", - PFID(&fp->gf_fid), rc); - RETURN(rc); - } + obj = mdt_object_find(env, mdt, &fp->gf_fid); + if (obj == NULL || IS_ERR(obj)) { + CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid), + PTR_ERR(obj)); + RETURN(-EINVAL); + } + + rc = lu_object_exists(&obj->mot_obj.mo_lu); + if (rc <= 0) { + if (rc == -1) + rc = -EREMOTE; + else + rc = -ENOENT; + mdt_object_put(env, obj); + CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n", + PFID(&fp->gf_fid), rc); + RETURN(rc); + } - rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path, - fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno); - mdt_object_put(env, obj); + rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path, + fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno); + mdt_object_put(env, obj); - RETURN(rc); + RETURN(rc); } -static int mdt_get_info(struct mdt_thread_info *info) +int mdt_get_info(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); char *key; @@ -6331,9 +5719,14 @@ static void mdt_key_fini(const struct lu_context *ctx, /* context key: mdt_thread_key */ LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD); -struct md_ucred *mdt_ucred(const struct mdt_thread_info *info) +struct lu_ucred *mdt_ucred(const struct mdt_thread_info *info) { - return md_ucred(info->mti_env); + return lu_ucred(info->mti_env); +} + +struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) +{ + return lu_ucred_check(info->mti_env); } /** @@ -6372,16 +5765,7 @@ int mdt_cos_is_enabled(struct mdt_device *mdt) return mdt->mdt_opts.mo_cos != 0; } -/* type constructor/destructor: mdt_type_init, mdt_type_fini */ -LU_TYPE_INIT_FINI(mdt, &mdt_thread_key); - static struct lu_device_type_operations mdt_device_type_ops = { - .ldto_init = mdt_type_init, - .ldto_fini = mdt_type_fini, - - .ldto_start = mdt_type_start, - .ldto_stop = mdt_type_stop, - .ldto_device_alloc = mdt_device_alloc, .ldto_device_free = mdt_device_free, .ldto_device_fini = mdt_device_fini @@ -6396,260 +5780,39 @@ static struct lu_device_type mdt_device_type = { static int __init mdt_mod_init(void) { - struct lprocfs_static_vars lvars; - int rc; + struct lprocfs_static_vars lvars; + int rc; rc = lu_kmem_init(mdt_caches); if (rc) return rc; - if (mdt_num_threads != 0 && mds_num_threads == 0) { - LCONSOLE_INFO("mdt_num_threads module parameter is deprecated," - "use mds_num_threads instead or unset both for" - "dynamic thread startup\n"); - mds_num_threads = mdt_num_threads; - } - - lprocfs_mdt_init_vars(&lvars); - rc = class_register_type(&mdt_obd_device_ops, NULL, - lvars.module_vars, LUSTRE_MDT_NAME, - &mdt_device_type); + rc = mds_mod_init(); + if (rc) + GOTO(lu_fini, rc); + lprocfs_mdt_init_vars(&lvars); + rc = class_register_type(&mdt_obd_device_ops, NULL, + lvars.module_vars, LUSTRE_MDT_NAME, + &mdt_device_type); + if (rc) + GOTO(mds_fini, rc); +lu_fini: if (rc) lu_kmem_fini(mdt_caches); - return rc; +mds_fini: + if (rc) + mds_mod_exit(); + return rc; } static void __exit mdt_mod_exit(void) { - class_unregister_type(LUSTRE_MDT_NAME); + class_unregister_type(LUSTRE_MDT_NAME); + mds_mod_exit(); lu_kmem_fini(mdt_caches); } -#define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt) \ -[opc - base] = { \ - .mh_name = #opc, \ - .mh_fail_id = OBD_FAIL_ ## opc ## _NET, \ - .mh_opc = opc, \ - .mh_flags = flags, \ - .mh_act = fn, \ - .mh_fmt = fmt \ -} - -/* Request with a format known in advance */ -#define DEF_MDT_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name) - -/* Request with a format we do not yet know */ -#define DEF_MDT_HDL_VAR(flags, name, fn) \ - DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL) - -/* Map one non-standard request format handler. This should probably get - * a common OBD_SET_INFO RPC opcode instead of this mismatch. */ -#define RQF_MDS_SET_INFO RQF_OBD_SET_INFO - -static struct mdt_handler mdt_mds_ops[] = { -DEF_MDT_HDL(0, MDS_CONNECT, mdt_connect), -DEF_MDT_HDL(0, MDS_DISCONNECT, mdt_disconnect), -DEF_MDT_HDL(0, MDS_SET_INFO, mdt_set_info), -DEF_MDT_HDL(0, MDS_GET_INFO, mdt_get_info), -DEF_MDT_HDL(0 | HABEO_REFERO, MDS_GETSTATUS, mdt_getstatus), -DEF_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr), -DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name), -DEF_MDT_HDL(HABEO_CORPUS, MDS_GETXATTR, mdt_getxattr), -DEF_MDT_HDL(0 | HABEO_REFERO, MDS_STATFS, mdt_statfs), -DEF_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint), -DEF_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close), -DEF_MDT_HDL(HABEO_CORPUS, MDS_DONE_WRITING, mdt_done_writing), -DEF_MDT_HDL(0 | HABEO_REFERO, MDS_PIN, mdt_pin), -DEF_MDT_HDL_VAR(0, MDS_SYNC, mdt_sync), -DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR, mdt_is_subdir), -DEF_MDT_HDL(0, MDS_QUOTACHECK, mdt_quotacheck), -DEF_MDT_HDL(0, MDS_QUOTACTL, mdt_quotactl) -}; - -#define DEF_OBD_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL) - -static struct mdt_handler mdt_obd_ops[] = { -DEF_OBD_HDL(0, OBD_PING, mdt_obd_ping), -DEF_OBD_HDL(0, OBD_LOG_CANCEL, mdt_obd_log_cancel), -DEF_OBD_HDL(0, OBD_QC_CALLBACK, mdt_obd_qc_callback), -DEF_OBD_HDL(0, OBD_IDX_READ, mdt_obd_idx_read) -}; - -#define DEF_DLM_HDL_VAR(flags, name, fn) \ - DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL) -#define DEF_DLM_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name) - -static struct mdt_handler mdt_dlm_ops[] = { -DEF_DLM_HDL (HABEO_CLAVIS, LDLM_ENQUEUE, mdt_enqueue), -DEF_DLM_HDL_VAR(HABEO_CLAVIS, LDLM_CONVERT, mdt_convert), -DEF_DLM_HDL_VAR(0, LDLM_BL_CALLBACK, mdt_bl_callback), -DEF_DLM_HDL_VAR(0, LDLM_CP_CALLBACK, mdt_cp_callback) -}; - -#define DEF_LLOG_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL) - -static struct mdt_handler mdt_llog_ops[] = { -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_create), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, mdt_llog_next_block), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, mdt_llog_read_header), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_WRITE_REC, NULL), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CLOSE, NULL), -DEF_LLOG_HDL(0, LLOG_ORIGIN_CONNECT, NULL), -DEF_LLOG_HDL(0, LLOG_CATINFO, NULL), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, mdt_llog_prev_block), -DEF_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_DESTROY, mdt_llog_destroy), -}; - -#define DEF_SEC_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL) - -static struct mdt_handler mdt_sec_ctx_ops[] = { -DEF_SEC_HDL(0, SEC_CTX_INIT, mdt_sec_ctx_handle), -DEF_SEC_HDL(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle), -DEF_SEC_HDL(0, SEC_CTX_FINI, mdt_sec_ctx_handle) -}; - -#define DEF_QUOTA_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name) - -static struct mdt_handler mdt_quota_ops[] = { -DEF_QUOTA_HDL(HABEO_REFERO, QUOTA_DQACQ, mdt_quota_dqacq), -}; - -static struct mdt_opc_slice mdt_regular_handlers[] = { - { - .mos_opc_start = MDS_GETATTR, - .mos_opc_end = MDS_LAST_OPC, - .mos_hs = mdt_mds_ops - }, - { - .mos_opc_start = OBD_PING, - .mos_opc_end = OBD_LAST_OPC, - .mos_hs = mdt_obd_ops - }, - { - .mos_opc_start = LDLM_ENQUEUE, - .mos_opc_end = LDLM_LAST_OPC, - .mos_hs = mdt_dlm_ops - }, - { - .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE, - .mos_opc_end = LLOG_LAST_OPC, - .mos_hs = mdt_llog_ops - }, - { - .mos_opc_start = SEC_CTX_INIT, - .mos_opc_end = SEC_LAST_OPC, - .mos_hs = mdt_sec_ctx_ops - }, - { - .mos_opc_start = QUOTA_DQACQ, - .mos_opc_end = QUOTA_LAST_OPC, - .mos_hs = mdt_quota_ops - }, - { - .mos_hs = NULL - } -}; - -/* Readpage/readdir handlers */ -static struct mdt_handler mdt_readpage_ops[] = { -DEF_MDT_HDL(0, MDS_CONNECT, mdt_connect), -DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage), -/* XXX: this is ugly and should be fixed one day, see mdc_close() for - * detailed comments. --umka */ -DEF_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close), -DEF_MDT_HDL(HABEO_CORPUS, MDS_DONE_WRITING, mdt_done_writing), -}; - -static struct mdt_opc_slice mdt_readpage_handlers[] = { - { - .mos_opc_start = MDS_GETATTR, - .mos_opc_end = MDS_LAST_OPC, - .mos_hs = mdt_readpage_ops - }, - { - .mos_opc_start = OBD_FIRST_OPC, - .mos_opc_end = OBD_LAST_OPC, - .mos_hs = mdt_obd_ops - }, - { - .mos_hs = NULL - } -}; - -/* Cross MDT operation handlers for DNE */ -static struct mdt_handler mdt_xmds_ops[] = { -DEF_MDT_HDL(0, MDS_CONNECT, mdt_connect), -DEF_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr), -DEF_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint), -DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR, mdt_is_subdir), -}; - -static struct mdt_opc_slice mdt_xmds_handlers[] = { - { - .mos_opc_start = MDS_GETATTR, - .mos_opc_end = MDS_LAST_OPC, - .mos_hs = mdt_xmds_ops - }, - { - .mos_opc_start = OBD_PING, - .mos_opc_end = OBD_LAST_OPC, - .mos_hs = mdt_obd_ops - }, - { - .mos_opc_start = SEC_CTX_INIT, - .mos_opc_end = SEC_LAST_OPC, - .mos_hs = mdt_sec_ctx_ops - }, - { - .mos_hs = NULL - } -}; - -/* Sequence service handlers */ -#define DEF_SEQ_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name) - -static struct mdt_handler mdt_seq_ops[] = { -DEF_SEQ_HDL(0, SEQ_QUERY, (void *)seq_query), -}; - -static struct mdt_opc_slice mdt_seq_handlers[] = { - { - .mos_opc_start = SEQ_QUERY, - .mos_opc_end = SEQ_LAST_OPC, - .mos_hs = mdt_seq_ops - }, - { - .mos_hs = NULL - } -}; - -/* FID Location Database handlers */ -#define DEF_FLD_HDL(flags, name, fn) \ - DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name) - -static struct mdt_handler mdt_fld_ops[] = { -DEF_FLD_HDL(0, FLD_QUERY, (void *)fld_query), -}; - -static struct mdt_opc_slice mdt_fld_handlers[] = { - { - .mos_opc_start = FLD_QUERY, - .mos_opc_end = FLD_LAST_OPC, - .mos_hs = mdt_fld_ops - }, - { - .mos_hs = NULL - } -}; - MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Target ("LUSTRE_MDT_NAME")"); MODULE_LICENSE("GPL");