X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=1cd161b76849858b9d7726fffded1876ff99a525;hb=e2af7fb3c91dfb13d34d8e1b2f2df8c09621f768;hp=def20ca0689b741cefb4f55c9f36331adb3026ef;hpb=d70db3335f52cc49f5e01858d27b0ccd61036c62;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index def20ca..1cd161b 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,17 +24,16 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_LMV #ifdef __KERNEL__ #include @@ -52,7 +49,6 @@ #include #endif -#include #include #include #include @@ -323,7 +319,7 @@ static void lmv_set_timeouts(struct obd_device *obd) if (tgts->ltd_exp == NULL) continue; - obd_set_info_async(tgts->ltd_exp, sizeof(KEY_INTERMDS), + obd_set_info_async(NULL, tgts->ltd_exp, sizeof(KEY_INTERMDS), KEY_INTERMDS, 0, NULL, NULL); } } @@ -568,6 +564,7 @@ int lmv_check_connect(struct obd_device *obd) } if (lmv->desc.ld_tgt_count == 0) { + lmv_init_unlock(lmv); CERROR("%s: no targets configured.\n", obd->obd_name); RETURN(-EINVAL); } @@ -626,8 +623,11 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_obd = class_exp2obd(tgt->ltd_exp); - if (mdc_obd) + if (mdc_obd) { + mdc_obd->obd_force = obd->obd_force; + mdc_obd->obd_fail = obd->obd_fail; mdc_obd->obd_no_recov = obd->obd_no_recov; + } #ifdef __KERNEL__ lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds"); @@ -748,15 +748,20 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, if (!mdc_obd) RETURN(-EINVAL); - rc = obd_statfs(mdc_obd, &stat_buf, - cfs_time_current_64() - CFS_HZ, 0); + /* copy UUID */ + if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), + min((int) data->ioc_plen2, + (int) sizeof(struct obd_uuid)))) + RETURN(-EFAULT); + + rc = obd_statfs(NULL, lmv->tgts[index].ltd_exp, &stat_buf, + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + 0); if (rc) RETURN(rc); if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf, - data->ioc_plen1)) - RETURN(-EFAULT); - if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), - data->ioc_plen2)) + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) RETURN(-EFAULT); break; } @@ -817,14 +822,22 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, sizeof(*icc), icc, NULL); break; } + case LL_IOC_GET_CONNECT_FLAGS: { + rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg); + break; + } default : { for (i = 0; i < count; i++) { int err; + struct obd_device *mdc_obd; if (lmv->tgts[i].ltd_exp == NULL) continue; - + /* ll_umount_begin() sets force flag but for lmv, not + * mdc. Let's pass it through */ + mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp); + mdc_obd->obd_force = obddev->obd_force; err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg); if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) { @@ -974,7 +987,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, * New seq alloc and FLD setup should be atomic. Otherwise we may find * on server that seq in new allocated fid is not yet known. */ - cfs_down(&tgt->ltd_fid_sem); + cfs_mutex_lock(&tgt->ltd_fid_mutex); if (!tgt->ltd_active) GOTO(out, rc = -ENODEV); @@ -990,7 +1003,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, EXIT; out: - cfs_up(&tgt->ltd_fid_sem); + cfs_mutex_unlock(&tgt->ltd_fid_mutex); return rc; } @@ -999,7 +1012,7 @@ int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + mdsno_t mds = 0; int rc; ENTRY; @@ -1061,7 +1074,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(-ENOMEM); for (i = 0; i < LMV_MAX_TGT_COUNT; i++) { - cfs_sema_init(&lmv->tgts[i].ltd_fid_sem, 1); + cfs_mutex_init(&lmv->tgts[i].ltd_fid_mutex); lmv->tgts[i].ltd_idx = i; } @@ -1080,7 +1093,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lmv->lmv_placement = PLACEMENT_CHAR_POLICY; cfs_spin_lock_init(&lmv->lmv_lock); - cfs_sema_init(&lmv->init_sem, 1); + cfs_mutex_init(&lmv->init_mutex); rc = lmv_object_setup(obd); if (rc) { @@ -1092,10 +1105,11 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lprocfs_obd_setup(obd, lvars.obd_vars); #ifdef LPROCFS { - rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd_status", + rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd", 0444, &lmv_proc_target_fops, obd); if (rc) - CWARN("Error adding target_obd_stats file (%d)\n", rc); + CWARN("%s: error adding LMV target_obd file: rc = %d\n", + obd->obd_name, rc); } #endif rc = fld_client_init(&lmv->lmv_fld, obd->obd_name, @@ -1122,7 +1136,6 @@ static int lmv_cleanup(struct obd_device *obd) ENTRY; fld_client_fini(&lmv->lmv_fld); - lprocfs_obd_cleanup(obd); lmv_object_cleanup(obd); OBD_FREE(lmv->datas, lmv->datas_size); OBD_FREE(lmv->tgts, lmv->tgts_size); @@ -1154,9 +1167,10 @@ out: RETURN(rc); } -static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) +static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, + struct obd_statfs *osfs, __u64 max_age, __u32 flags) { + struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; struct obd_statfs *temp; int rc = 0; @@ -1175,7 +1189,7 @@ static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs, if (lmv->tgts[i].ltd_exp == NULL) continue; - rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, + rc = obd_statfs(env, lmv->tgts[i].ltd_exp, temp, max_age, flags); if (rc) { CERROR("can't stat MDS #%d (%s), error %d\n", i, @@ -1368,6 +1382,36 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, RETURN(0); } +static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid, + ldlm_iterator_t it, void *data) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int i; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); + + /* + * With CMD every object can have two locks in different namespaces: + * lookup lock in space of mds storing direntry and update/open lock in + * space of mds storing inode. + */ + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data); + if (rc) + RETURN(rc); + } + + RETURN(rc); +} + + static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, struct md_open_data *mod, struct ptlrpc_request **request) { @@ -1875,7 +1919,7 @@ static int lmv_early_cancel_slaves(struct obd_export *exp, CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n", PFID(st_fid), tgt->ltd_idx); rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, - mode, LDLM_FL_ASYNC, NULL); + mode, LCF_ASYNC, NULL); if (rc) GOTO(out_put_obj, rc); } else { @@ -1926,7 +1970,7 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); policy.l_inodebits.bits = bits; rc = md_cancel_unused(tgt->ltd_exp, fid, &policy, - mode, LDLM_FL_ASYNC, NULL); + mode, LCF_ASYNC, NULL); } else { CDEBUG(D_INODE, "EARLY_CANCEL skip operation target %d on "DFID"\n", @@ -2026,7 +2070,6 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *src_tgt; - struct lmv_tgt_desc *tgt_tgt; int rc; int sidx; int loop = 0; @@ -2086,7 +2129,6 @@ repeat: op_data->op_cap = cfs_curproc_cap_pack(); src_tgt = lmv_get_target(lmv, mds1); - tgt_tgt = lmv_get_target(lmv, mds2); /* * LOOKUP lock on src child (fid3) should also be cancelled for @@ -2275,7 +2317,7 @@ static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj) val = le64_to_cpu(*hash); if (val < hash_adj) val += MAX_HASH_SIZE; - if (val != DIR_END_OFF) + if (val != MDS_DIR_END_OFF) *hash = cpu_to_le64(val - hash_adj); } @@ -2296,15 +2338,14 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid) return id ^ (id >> 32); } -static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, __u64 offset64, struct page *page, - struct ptlrpc_request **request) +static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, + struct page **pages, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lu_fid rid = *fid; struct lmv_object *obj; - __u64 offset; + struct lu_fid rid = op_data->op_fid1; + __u64 offset = op_data->op_offset; __u64 hash_adj = 0; __u32 rank = 0; __u64 seg_size = 0; @@ -2313,14 +2354,17 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, int tgt0_idx = 0; int rc; int nr = 0; + int i; + /* number of pages read, in CFS_PAGE_SIZE */ + int nrdpgs; + /* number of pages transferred in LU_PAGE_SIZE */ + int nlupgs; struct lmv_stripe *los; struct lmv_tgt_desc *tgt; struct lu_dirpage *dp; struct lu_dirent *ent; ENTRY; - offset = offset64; - rc = lmv_check_connect(obd); if (rc) RETURN(rc); @@ -2347,7 +2391,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj * on hash values that we get. */ - obj = lmv_object_find_lock(obd, fid); + obj = lmv_object_find_lock(obd, &rid); if (obj) { nr = obj->lo_objcount; LASSERT(nr > 0); @@ -2355,7 +2399,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, do_div(seg_size, nr); los = obj->lo_stripes; tgt = lmv_get_target(lmv, los[0].ls_mds); - rank = lmv_node_rank(tgt->ltd_exp, fid) % nr; + rank = lmv_node_rank(tgt->ltd_exp, &rid) % nr; tgt_tmp = offset; do_div(tgt_tmp, seg_size); tgt0_idx = do_div(tgt_tmp, nr); @@ -2390,34 +2434,103 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, if (IS_ERR(tgt)) GOTO(cleanup, rc = PTR_ERR(tgt)); - rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request); + op_data->op_fid1 = rid; + rc = md_readpage(tgt->ltd_exp, op_data, pages, request); if (rc) GOTO(cleanup, rc); - if (obj) { - dp = cfs_kmap(page); - lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); - lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); - LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64); + nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1) + >> CFS_PAGE_SHIFT; + nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; + LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); + LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) - lmv_hash_adjust(&ent->lde_hash, hash_adj); + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, + op_data->op_npages); - if (tgt0_idx != nr - 1) { - __u64 end; + for (i = 0; i < nrdpgs; i++) { +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + struct lu_dirpage *first; + __u64 hash_end = 0; + __u32 flags = 0; +#endif + struct lu_dirent *tmp = NULL; - end = le64_to_cpu(dp->ldp_hash_end); - if (end == DIR_END_OFF) { + dp = cfs_kmap(pages[i]); + if (obj) { + lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); + lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); + LASSERT(le64_to_cpu(dp->ldp_hash_start) <= + op_data->op_offset); + + if ((tgt0_idx != nr - 1) && + (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF)) + { dp->ldp_hash_end = cpu_to_le32(seg_size * (tgt0_idx + 1)); CDEBUG(D_INODE, ""DFID" reset end "LPX64" tgt %d\n", PFID(&rid), - le64_to_cpu(dp->ldp_hash_end), tgt_idx); + (__u64)le64_to_cpu(dp->ldp_hash_end), + tgt_idx); } } - cfs_kunmap(page); + + ent = lu_dirent_start(dp); +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + first = dp; + hash_end = dp->ldp_hash_end; +repeat: +#endif + nlupgs--; + for (tmp = ent; ent != NULL; + tmp = ent, ent = lu_dirent_next(ent)) { + if (obj) + lmv_hash_adjust(&ent->lde_hash, hash_adj); + } + +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) { + ent = lu_dirent_start(dp); + + if (obj) { + lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); + if ((tgt0_idx != nr - 1) && + (le64_to_cpu(dp->ldp_hash_end) == + MDS_DIR_END_OFF)) { + hash_end = cpu_to_le32(seg_size * + (tgt0_idx + 1)); + CDEBUG(D_INODE, + ""DFID" reset end "LPX64" tgt %d\n", + PFID(&rid), + (__u64)le64_to_cpu(hash_end), + tgt_idx); + } + } + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; + + if (tmp) { + /* enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage, in this way + * several lu_dirpages can be stored into one + * client page on client. */ + tmp = ((void *)tmp) + + le16_to_cpu(tmp->lde_reclen); + tmp->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)tmp); + goto repeat; + } + } + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); +#else + SET_BUT_UNUSED(tmp); +#endif + cfs_kunmap(pages[i]); } EXIT; cleanup: @@ -2512,7 +2625,8 @@ repeat: static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { - int rc = 0; + struct lmv_obd *lmv = &obd->u.lmv; + int rc = 0; switch (stage) { case OBD_CLEANUP_EARLY: @@ -2520,6 +2634,8 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) * stack. */ break; case OBD_CLEANUP_EXPORTS: + fld_client_proc_fini(&lmv->lmv_fld); + lprocfs_obd_cleanup(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -2530,8 +2646,8 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) RETURN(rc); } -static int lmv_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *vallen, void *val, +static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, + __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) { struct obd_device *obd; @@ -2567,7 +2683,7 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, continue; } - if (!obd_get_info(tgts->ltd_exp, keylen, key, + if (!obd_get_info(env, tgts->ltd_exp, keylen, key, vallen, val, NULL)) RETURN(0); } @@ -2581,22 +2697,25 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, * Forwarding this request to first MDS, it should know LOV * desc. */ - rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key, + rc = obd_get_info(env, lmv->tgts[0].ltd_exp, keylen, key, vallen, val, NULL); if (!rc && KEY_IS(KEY_CONN_DATA)) { exp->exp_connect_flags = ((struct obd_connect_data *)val)->ocd_connect_flags; } RETURN(rc); + } else if (KEY_IS(KEY_TGT_COUNT)) { + *((int *)val) = lmv->desc.ld_tgt_count; + RETURN(0); } CDEBUG(D_IOCTL, "Invalid key\n"); RETURN(-EINVAL); } -int lmv_set_info_async(struct obd_export *exp, obd_count keylen, - void *key, obd_count vallen, void *val, - struct ptlrpc_request_set *set) +int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, + obd_count keylen, void *key, obd_count vallen, + void *val, struct ptlrpc_request_set *set) { struct lmv_tgt_desc *tgt; struct obd_device *obd; @@ -2612,8 +2731,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, } lmv = &obd->u.lmv; - if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) || - KEY_IS(KEY_INIT_RECOV_BACKUP)) { + if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) { int i, err = 0; for (i = 0; i < lmv->desc.ld_tgt_count; i++) { @@ -2622,7 +2740,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, if (!tgt->ltd_exp) continue; - err = obd_set_info_async(tgt->ltd_exp, + err = obd_set_info_async(env, tgt->ltd_exp, keylen, key, vallen, val, set); if (err && rc == 0) rc = err; @@ -2650,13 +2768,13 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, RETURN(mea_size); if (*lmmp && !lsm) { - OBD_FREE(*lmmp, mea_size); + OBD_FREE_LARGE(*lmmp, mea_size); *lmmp = NULL; RETURN(0); } if (*lmmp == NULL) { - OBD_ALLOC(*lmmp, mea_size); + OBD_ALLOC_LARGE(*lmmp, mea_size); if (*lmmp == NULL) RETURN(-ENOMEM); } @@ -2700,14 +2818,14 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, return mea_size; if (*lsmp != NULL && lmm == NULL) { - OBD_FREE(*tmea, mea_size); + OBD_FREE_LARGE(*tmea, mea_size); *lsmp = NULL; RETURN(0); } LASSERT(mea_size == lmm_size); - OBD_ALLOC(*tmea, mea_size); + OBD_ALLOC_LARGE(*tmea, mea_size); if (*tmea == NULL) RETURN(-ENOMEM); @@ -2740,7 +2858,7 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, ldlm_policy_data_t *policy, ldlm_mode_t mode, - int flags, void *opaque) + ldlm_cancel_flags_t flags, void *opaque) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2764,7 +2882,7 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, } int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, - __u32 *bits) + __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2922,7 +3040,7 @@ int lmv_intent_getattr_async(struct obd_export *exp, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_object *obj; - struct lmv_tgt_desc *tgt; + struct lmv_tgt_desc *tgt = NULL; int rc; int sidx; ENTRY; @@ -2931,36 +3049,21 @@ int lmv_intent_getattr_async(struct obd_export *exp, if (rc) RETURN(rc); - if (!fid_is_sane(&op_data->op_fid2)) { + if (op_data->op_namelen) { obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj && op_data->op_namelen) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, + if (obj) { + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)op_data->op_name, op_data->op_namelen); op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, - obj->lo_stripes[sidx].ls_mds); - CDEBUG(D_INODE, - "Choose slave dir ("DFID") -> mds #%d\n", - PFID(&op_data->op_fid1), tgt->ltd_idx); - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - } - if (obj) + tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); lmv_object_put(obj); - } else { - op_data->op_fid1 = op_data->op_fid2; - tgt = lmv_find_target(lmv, &op_data->op_fid2); - op_data->op_bias = MDS_CROSS_REF; - /* - * Unfortunately, we have to lie to MDC/MDS to retrieve - * attributes llite needs. - */ - if (minfo->mi_it.it_op & IT_LOOKUP) - minfo->mi_it.it_op = IT_GETATTR; + } } + if (tgt == NULL) + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2969,7 +3072,7 @@ int lmv_intent_getattr_async(struct obd_export *exp, } int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid, __u32 *bits) + struct lu_fid *fid, __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2989,6 +3092,83 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, RETURN(rc); } +/** + * For lmv, only need to send request to master MDT, and the master MDT will + * process with other slave MDTs. The only exception is Q_GETOQUOTA for which + * we directly fetch data from the slave MDTs. + */ +int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = &lmv->tgts[0]; + int rc = 0, i; + __u64 curspace, curinodes; + ENTRY; + + if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) { + CERROR("master lmv inactive\n"); + RETURN(-EIO); + } + + if (oqctl->qc_cmd != Q_GETOQUOTA) { + rc = obd_quotactl(tgt->ltd_exp, oqctl); + RETURN(rc); + } + + curspace = curinodes = 0; + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + int err; + tgt = &lmv->tgts[i]; + + if (tgt->ltd_exp == NULL) + continue; + if (!tgt->ltd_active) { + CDEBUG(D_HA, "mdt %d is inactive.\n", i); + continue; + } + + err = obd_quotactl(tgt->ltd_exp, oqctl); + if (err) { + CERROR("getquota on mdt %d failed. %d\n", i, err); + if (!rc) + rc = err; + } else { + curspace += oqctl->qc_dqblk.dqb_curspace; + curinodes += oqctl->qc_dqblk.dqb_curinodes; + } + } + oqctl->qc_dqblk.dqb_curspace = curspace; + oqctl->qc_dqblk.dqb_curinodes = curinodes; + + RETURN(rc); +} + +int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int i, rc = 0; + ENTRY; + + for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) { + int err; + + if (!tgt->ltd_active) { + CERROR("lmv idx %d inactive\n", i); + RETURN(-EIO); + } + + err = obd_quotacheck(tgt->ltd_exp, oqctl); + if (err && !rc) + rc = err; + } + + RETURN(rc); +} struct obd_ops lmv_obd_ops = { .o_owner = THIS_MODULE, @@ -3006,12 +3186,15 @@ struct obd_ops lmv_obd_ops = { .o_notify = lmv_notify, .o_get_uuid = lmv_get_uuid, .o_iocontrol = lmv_iocontrol, - .o_fid_delete = lmv_fid_delete + .o_fid_delete = lmv_fid_delete, + .o_quotacheck = lmv_quotacheck, + .o_quotactl = lmv_quotactl }; struct md_ops lmv_md_ops = { .m_getstatus = lmv_getstatus, .m_change_cbdata = lmv_change_cbdata, + .m_find_cbdata = lmv_find_cbdata, .m_close = lmv_close, .m_create = lmv_create, .m_done_writing = lmv_done_writing, @@ -3042,9 +3225,6 @@ struct md_ops lmv_md_ops = { .m_revalidate_lock = lmv_revalidate_lock }; -static quota_interface_t *quota_interface; -extern quota_interface_t lmv_quota_interface; - int __init lmv_init(void) { struct lprocfs_static_vars lvars; @@ -3060,17 +3240,10 @@ int __init lmv_init(void) lprocfs_lmv_init_vars(&lvars); - cfs_request_module("lquota"); - quota_interface = PORTAL_SYMBOL_GET(lmv_quota_interface); - init_obd_quota_ops(quota_interface, &lmv_obd_ops); - rc = class_register_type(&lmv_obd_ops, &lmv_md_ops, lvars.module_vars, LUSTRE_LMV_NAME, NULL); - if (rc) { - if (quota_interface) - PORTAL_SYMBOL_PUT(lmv_quota_interface); + if (rc) cfs_mem_cache_destroy(lmv_object_cache); - } return rc; } @@ -3078,9 +3251,6 @@ int __init lmv_init(void) #ifdef __KERNEL__ static void lmv_exit(void) { - if (quota_interface) - PORTAL_SYMBOL_PUT(lmv_quota_interface); - class_unregister_type(LUSTRE_LMV_NAME); LASSERTF(cfs_atomic_read(&lmv_object_count) == 0,