X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=d89782698f7395f627714058b7c132e37de3655a;hb=322968acf183ab16d952cd3026f6580957b31259;hp=004218ee7d805cb2a5a346756132879509ba4aff;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 004218e..d897826 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -26,10 +26,13 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* + * Copyright (c) 2011 Whamcloud, Inc. + */ +/* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ @@ -52,7 +55,6 @@ #include #endif -#include #include #include #include @@ -65,7 +67,7 @@ /* object cache. */ cfs_mem_cache_t *lmv_object_cache; -atomic_t lmv_object_count = ATOMIC_INIT(0); +cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0); static void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, @@ -97,7 +99,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n", lmv, uuid->uuid, activate); - spin_lock(&lmv->lmv_lock); + cfs_spin_lock(&lmv->lmv_lock); for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) { if (tgt->ltd_exp == NULL) continue; @@ -133,7 +135,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, EXIT; out_lmv_lock: - spin_unlock(&lmv->lmv_lock); + cfs_spin_unlock(&lmv->lmv_lock); return rc; } @@ -146,7 +148,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid, LASSERT(data != NULL); - spin_lock(&lmv->lmv_lock); + cfs_spin_lock(&lmv->lmv_lock); for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) { if (tgt->ltd_exp == NULL) continue; @@ -156,7 +158,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid, break; } } - spin_unlock(&lmv->lmv_lock); + cfs_spin_unlock(&lmv->lmv_lock); RETURN(0); } @@ -239,7 +241,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched, * caller that everything is okay. Real connection will be performed later. */ static int lmv_connect(const struct lu_env *env, - struct lustre_handle *conn, struct obd_device *obd, + struct obd_export **exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, void *localdata) { @@ -247,29 +249,30 @@ static int lmv_connect(const struct lu_env *env, struct proc_dir_entry *lmv_proc_dir; #endif struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *exp; + struct lustre_handle conn = { 0 }; int rc = 0; ENTRY; - rc = class_connect(conn, obd, cluuid); - if (rc) { - CERROR("class_connection() returned %d\n", rc); - RETURN(rc); - } - - exp = class_conn2export(conn); - /* * We don't want to actually do the underlying connections more than * once, so keep track. */ lmv->refcount++; if (lmv->refcount > 1) { - class_export_put(exp); + *exp = NULL; RETURN(0); } - lmv->exp = exp; + rc = class_connect(&conn, obd, cluuid); + if (rc) { + CERROR("class_connection() returned %d\n", rc); + RETURN(rc); + } + + *exp = class_conn2export(&conn); + class_export_get(*exp); + + lmv->exp = *exp; lmv->connected = 0; lmv->cluuid = *cluuid; @@ -383,7 +386,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) struct obd_uuid *cluuid = &lmv->cluuid; struct obd_connect_data *mdc_data = NULL; struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" }; - struct lustre_handle conn = {0, }; struct obd_device *mdc_obd; struct obd_export *mdc_exp; struct lu_fld_target target; @@ -407,15 +409,13 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) RETURN(-EINVAL); } - rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid, + rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid, &lmv->conn_data, NULL); if (rc) { CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc); RETURN(rc); } - mdc_exp = class_conn2export(&conn); - /* * Init fid sequence client for this mdc and add new fld target. */ @@ -465,22 +465,20 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n", mdc_obd->obd_name, mdc_obd->obd_uuid.uuid, - atomic_read(&obd->obd_refcount)); + cfs_atomic_read(&obd->obd_refcount)); #ifdef __KERNEL__ lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds"); if (lmv_proc_dir) { struct proc_dir_entry *mdc_symlink; - char name[MAX_STRING_SIZE + 1]; LASSERT(mdc_obd->obd_type != NULL); LASSERT(mdc_obd->obd_type->typ_name != NULL); - name[MAX_STRING_SIZE] = '\0'; - snprintf(name, MAX_STRING_SIZE, "../../../%s/%s", - mdc_obd->obd_type->typ_name, - mdc_obd->obd_name); - mdc_symlink = proc_symlink(mdc_obd->obd_name, - lmv_proc_dir, name); + mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name, + lmv_proc_dir, + "../../../%s/%s", + mdc_obd->obd_type->typ_name, + mdc_obd->obd_name); if (mdc_symlink == NULL) { CERROR("Could not register LMV target " "/proc/fs/lustre/%s/%s/target_obds/%s.", @@ -523,24 +521,24 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid) RETURN(-EINVAL); } - rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, 0, NULL, tgt_uuid); + rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, NULL); if (rc) { lmv_init_unlock(lmv); CERROR("lmv failed to setup llogging subsystems\n"); } } - spin_lock(&lmv->lmv_lock); + cfs_spin_lock(&lmv->lmv_lock); tgt = lmv->tgts + lmv->desc.ld_tgt_count++; tgt->ltd_uuid = *tgt_uuid; - spin_unlock(&lmv->lmv_lock); + cfs_spin_unlock(&lmv->lmv_lock); if (lmv->connected) { rc = lmv_connect_mdc(obd, tgt); if (rc) { - spin_lock(&lmv->lmv_lock); + cfs_spin_lock(&lmv->lmv_lock); lmv->desc.ld_tgt_count--; memset(tgt, 0, sizeof(*tgt)); - spin_unlock(&lmv->lmv_lock); + cfs_spin_unlock(&lmv->lmv_lock); } else { int easize = sizeof(struct lmv_stripe_md) + lmv->desc.ld_tgt_count * @@ -630,8 +628,11 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_obd = class_exp2obd(tgt->ltd_exp); - if (mdc_obd) + if (mdc_obd) { + mdc_obd->obd_force = obd->obd_force; + mdc_obd->obd_fail = obd->obd_fail; mdc_obd->obd_no_recov = obd->obd_no_recov; + } #ifdef __KERNEL__ lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds"); @@ -742,8 +743,6 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, __u32 index; memcpy(&index, data->ioc_inlbuf2, sizeof(__u32)); - LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs)); - if ((index >= count)) RETURN(-ENODEV); @@ -754,14 +753,20 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, if (!mdc_obd) RETURN(-EINVAL); + /* copy UUID */ + if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), + min((int) data->ioc_plen2, + (int) sizeof(struct obd_uuid)))) + RETURN(-EFAULT); + rc = obd_statfs(mdc_obd, &stat_buf, - cfs_time_current_64() - HZ, 0); + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + 0); if (rc) RETURN(rc); - if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1)) - RETURN(-EFAULT); - if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), - data->ioc_plen2)) + if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf, + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) RETURN(-EFAULT); break; } @@ -811,13 +816,33 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, OBD_FREE_PTR(oqctl); break; } + case OBD_IOC_CHANGELOG_SEND: + case OBD_IOC_CHANGELOG_CLEAR: { + struct ioc_changelog *icc = karg; + + if (icc->icc_mdtindex >= count) + RETURN(-ENODEV); + + rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp, + sizeof(*icc), icc, NULL); + break; + } + case LL_IOC_GET_CONNECT_FLAGS: { + rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg); + break; + } + default : { for (i = 0; i < count; i++) { int err; + struct obd_device *mdc_obd; if (lmv->tgts[i].ltd_exp == NULL) continue; - + /* ll_umount_begin() sets force flag but for lmv, not + * mdc. Let's pass it through */ + mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp); + mdc_obd->obd_force = obddev->obd_force; err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg); if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) { @@ -967,7 +992,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, * New seq alloc and FLD setup should be atomic. Otherwise we may find * on server that seq in new allocated fid is not yet known. */ - down(&tgt->ltd_fid_sem); + cfs_down(&tgt->ltd_fid_sem); if (!tgt->ltd_active) GOTO(out, rc = -ENODEV); @@ -978,25 +1003,12 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); if (rc > 0) { LASSERT(fid_is_sane(fid)); - - /* - * Client switches to new sequence, setup FLD. - */ - rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid), - mds, NULL); - if (rc) { - /* - * Delete just allocated fid sequence in case - * of fail back. - */ - CERROR("Can't create fld entry, rc %d\n", rc); - obd_fid_delete(tgt->ltd_exp, NULL); - } + rc = 0; } EXIT; out: - up(&tgt->ltd_fid_sem); + cfs_up(&tgt->ltd_fid_sem); return rc; } @@ -1067,7 +1079,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(-ENOMEM); for (i = 0; i < LMV_MAX_TGT_COUNT; i++) { - sema_init(&lmv->tgts[i].ltd_fid_sem, 1); + cfs_sema_init(&lmv->tgts[i].ltd_fid_sem, 1); lmv->tgts[i].ltd_idx = i; } @@ -1085,8 +1097,8 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lmv->max_easize = 0; lmv->lmv_placement = PLACEMENT_CHAR_POLICY; - spin_lock_init(&lmv->lmv_lock); - sema_init(&lmv->init_sem, 1); + cfs_spin_lock_init(&lmv->lmv_lock); + cfs_sema_init(&lmv->init_sem, 1); rc = lmv_object_setup(obd); if (rc) { @@ -1098,10 +1110,11 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lprocfs_obd_setup(obd, lvars.obd_vars); #ifdef LPROCFS { - rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd_status", + rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd", 0444, &lmv_proc_target_fops, obd); if (rc) - CWARN("Error adding target_obd_stats file (%d)\n", rc); + CWARN("%s: error adding LMV target_obd file: rc = %d\n", + obd->obd_name, rc); } #endif rc = fld_client_init(&lmv->lmv_fld, obd->obd_name, @@ -1128,7 +1141,6 @@ static int lmv_cleanup(struct obd_device *obd) ENTRY; fld_client_fini(&lmv->lmv_fld); - lprocfs_obd_cleanup(obd); lmv_object_cleanup(obd); OBD_FREE(lmv->datas, lmv->datas_size); OBD_FREE(lmv->tgts, lmv->tgts_size); @@ -1274,8 +1286,7 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, obd_valid valid, int ea_size, +static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; @@ -1290,17 +1301,22 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt = lmv_find_target(lmv, fid); + tgt = lmv_find_target(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - rc = md_getattr(tgt->ltd_exp, fid, oc, valid, ea_size, request); + if (op_data->op_valid & OBD_MD_MDTIDX) { + op_data->op_mds = tgt->ltd_idx; + RETURN(0); + } + + rc = md_getattr(tgt->ltd_exp, op_data, request); if (rc) RETURN(rc); - obj = lmv_object_find_lock(obd, fid); + obj = lmv_object_find_lock(obd, &op_data->op_fid1); - CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(fid), + CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1), obj ? "(split)" : ""); /* @@ -1370,6 +1386,36 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, RETURN(0); } +static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid, + ldlm_iterator_t it, void *data) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int i; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); + + /* + * With CMD every object can have two locks in different namespaces: + * lookup lock in space of mds storing direntry and update/open lock in + * space of mds storing inode. + */ + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data); + if (rc) + RETURN(rc); + } + + RETURN(rc); +} + + static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, struct md_open_data *mod, struct ptlrpc_request **request) { @@ -1404,6 +1450,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) struct lmv_tgt_desc *tgt; struct lmv_object *obj; struct lustre_md md; + struct md_op_data *op_data; int mealen; int rc; __u64 valid; @@ -1421,7 +1468,17 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) /* * Time to update mea of parent fid. */ - rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req); + + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + RETURN(-ENOMEM); + + op_data->op_fid1 = *fid; + op_data->op_mode = mealen; + op_data->op_valid = valid; + + rc = md_getattr(tgt->ltd_exp, op_data, &req); + OBD_FREE_PTR(op_data); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, rc); @@ -1498,7 +1555,7 @@ repeat: else if (rc) RETURN(rc); - CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", + CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n", op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), op_data->op_mds); @@ -1739,18 +1796,17 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, } static int -lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, const char *name, int namelen, - obd_valid valid, int ea_size, __u32 suppgid, +lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data, struct ptlrpc_request **request) { struct ptlrpc_request *req = NULL; struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lu_fid rid = *fid; + struct lu_fid rid = op_data->op_fid1; struct lmv_tgt_desc *tgt; struct mdt_body *body; struct lmv_object *obj; + obd_valid valid = op_data->op_valid; int rc; int loop = 0; int sidx; @@ -1766,23 +1822,27 @@ repeat: obj = lmv_object_find(obd, &rid); if (obj) { sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - name, namelen - 1); + op_data->op_name, op_data->op_namelen); rid = obj->lo_stripes[sidx].ls_fid; tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); + op_data->op_mds = obj->lo_stripes[sidx].ls_mds; valid &= ~OBD_MD_FLCKSPLIT; lmv_object_put(obj); } else { tgt = lmv_find_target(lmv, &rid); valid |= OBD_MD_FLCKSPLIT; + op_data->op_mds = tgt->ltd_idx; } if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n", - namelen, name, PFID(fid), PFID(&rid), tgt->ltd_idx); + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + PFID(&rid), tgt->ltd_idx); - rc = md_getattr_name(tgt->ltd_exp, &rid, oc, name, namelen, valid, - ea_size, suppgid, request); + op_data->op_valid = valid; + op_data->op_fid1 = rid; + rc = md_getattr_name(tgt->ltd_exp, op_data, request); if (rc == 0) { body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); @@ -1799,9 +1859,11 @@ repeat: RETURN(PTR_ERR(tgt)); } - rc = md_getattr_name(tgt->ltd_exp, &rid, NULL, NULL, - 1, valid | OBD_MD_FLCROSSREF, - ea_size, suppgid, &req); + op_data->op_fid1 = rid; + op_data->op_valid |= OBD_MD_FLCROSSREF; + op_data->op_namelen = 0; + op_data->op_name = NULL; + rc = md_getattr_name(tgt->ltd_exp, op_data, &req); ptlrpc_req_finished(*request); *request = req; } @@ -1861,7 +1923,7 @@ static int lmv_early_cancel_slaves(struct obd_export *exp, CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n", PFID(st_fid), tgt->ltd_idx); rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, - mode, LDLM_FL_ASYNC, NULL); + mode, LCF_ASYNC, NULL); if (rc) GOTO(out_put_obj, rc); } else { @@ -1912,7 +1974,7 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); policy.l_inodebits.bits = bits; rc = md_cancel_unused(tgt->ltd_exp, fid, &policy, - mode, LDLM_FL_ASYNC, NULL); + mode, LCF_ASYNC, NULL); } else { CDEBUG(D_INODE, "EARLY_CANCEL skip operation target %d on "DFID"\n", @@ -1970,11 +2032,11 @@ repeat: RETURN(rc); } - CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n", + CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n", mds, PFID(&op_data->op_fid1)); - op_data->op_fsuid = current->fsuid; - op_data->op_fsgid = current->fsgid; + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); op_data->op_cap = cfs_curproc_cap_pack(); tgt = lmv_get_target(lmv, mds); @@ -2012,7 +2074,6 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *src_tgt; - struct lmv_tgt_desc *tgt_tgt; int rc; int sidx; int loop = 0; @@ -2067,12 +2128,11 @@ repeat: RETURN(rc); } - op_data->op_fsuid = current->fsuid; - op_data->op_fsgid = current->fsgid; + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); op_data->op_cap = cfs_curproc_cap_pack(); src_tgt = lmv_get_target(lmv, mds1); - tgt_tgt = lmv_get_target(lmv, mds2); /* * LOOKUP lock on src child (fid3) should also be cancelled for @@ -2261,7 +2321,7 @@ static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj) val = le64_to_cpu(*hash); if (val < hash_adj) val += MAX_HASH_SIZE; - if (val != DIR_END_OFF) + if (val != MDS_DIR_END_OFF) *hash = cpu_to_le64(val - hash_adj); } @@ -2283,7 +2343,8 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid) } static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, __u64 offset64, struct page *page, + struct obd_capa *oc, __u64 offset64, + struct page **pages, unsigned npages, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; @@ -2299,6 +2360,11 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, int tgt0_idx = 0; int rc; int nr = 0; + int i; + /* number of pages read, in CFS_PAGE_SIZE */ + int nrdpgs; + /* number of pages transferred in LU_PAGE_SIZE */ + int nlupgs; struct lmv_stripe *los; struct lmv_tgt_desc *tgt; struct lu_dirpage *dp; @@ -2376,34 +2442,101 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, if (IS_ERR(tgt)) GOTO(cleanup, rc = PTR_ERR(tgt)); - rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request); + rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, pages, npages, + request); if (rc) GOTO(cleanup, rc); - if (obj) { - dp = cfs_kmap(page); - lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); - lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); - LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64); + nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1) + >> CFS_PAGE_SHIFT; + nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; + LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); + LASSERT(nrdpgs > 0 && nrdpgs <= npages); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) - lmv_hash_adjust(&ent->lde_hash, hash_adj); + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, npages); - if (tgt0_idx != nr - 1) { - __u64 end; + for (i = 0; i < nrdpgs; i++) { +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + struct lu_dirpage *first; + __u64 hash_end = 0; + __u32 flags = 0; +#endif + struct lu_dirent *tmp = NULL; + + dp = cfs_kmap(pages[i]); + if (obj) { + lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); + lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); + LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64); - end = le64_to_cpu(dp->ldp_hash_end); - if (end == DIR_END_OFF) { + if ((tgt0_idx != nr - 1) && + (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF)) + { dp->ldp_hash_end = cpu_to_le32(seg_size * (tgt0_idx + 1)); CDEBUG(D_INODE, ""DFID" reset end "LPX64" tgt %d\n", PFID(&rid), - le64_to_cpu(dp->ldp_hash_end), tgt_idx); + (__u64)le64_to_cpu(dp->ldp_hash_end), + tgt_idx); + } + } + + ent = lu_dirent_start(dp); +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + first = dp; + hash_end = dp->ldp_hash_end; +repeat: +#endif + nlupgs--; + for (tmp = ent; ent != NULL; + tmp = ent, ent = lu_dirent_next(ent)) { + if (obj) + lmv_hash_adjust(&ent->lde_hash, hash_adj); + } + +#if CFS_PAGE_SIZE > LU_PAGE_SIZE + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) { + ent = lu_dirent_start(dp); + + if (obj) { + lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); + if ((tgt0_idx != nr - 1) && + (le64_to_cpu(dp->ldp_hash_end) == + MDS_DIR_END_OFF)) { + hash_end = cpu_to_le32(seg_size * + (tgt0_idx + 1)); + CDEBUG(D_INODE, + ""DFID" reset end "LPX64" tgt %d\n", + PFID(&rid), + (__u64)le64_to_cpu(hash_end), + tgt_idx); + } + } + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; + + if (tmp) { + /* enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage, in this way + * several lu_dirpages can be stored into one + * client page on client. */ + tmp = ((void *)tmp) + + le16_to_cpu(tmp->lde_reclen); + tmp->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)tmp); + goto repeat; } } - cfs_kunmap(page); + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); +#else + SET_BUT_UNUSED(tmp); +#endif + cfs_kunmap(pages[i]); } EXIT; cleanup: @@ -2456,8 +2589,8 @@ repeat: op_data->op_bias |= MDS_CHECK_SPLIT; } - op_data->op_fsuid = current->fsuid; - op_data->op_fsgid = current->fsgid; + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); op_data->op_cap = cfs_curproc_cap_pack(); /* @@ -2498,7 +2631,8 @@ repeat: static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { - int rc = 0; + struct lmv_obd *lmv = &obd->u.lmv; + int rc = 0; switch (stage) { case OBD_CLEANUP_EARLY: @@ -2506,6 +2640,8 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) * stack. */ break; case OBD_CLEANUP_EXPORTS: + fld_client_proc_fini(&lmv->lmv_fld); + lprocfs_obd_cleanup(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -2574,6 +2710,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, ((struct obd_connect_data *)val)->ocd_connect_flags; } RETURN(rc); + } else if (KEY_IS(KEY_TGT_COUNT)) { + *((int *)val) = lmv->desc.ld_tgt_count; + RETURN(0); } CDEBUG(D_IOCTL, "Invalid key\n"); @@ -2598,8 +2737,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, } lmv = &obd->u.lmv; - if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) || - KEY_IS(KEY_INIT_RECOV_BACKUP)) { + if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) { int i, err = 0; for (i = 0; i < lmv->desc.ld_tgt_count; i++) { @@ -2636,13 +2774,13 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, RETURN(mea_size); if (*lmmp && !lsm) { - OBD_FREE(*lmmp, mea_size); + OBD_FREE_LARGE(*lmmp, mea_size); *lmmp = NULL; RETURN(0); } if (*lmmp == NULL) { - OBD_ALLOC(*lmmp, mea_size); + OBD_ALLOC_LARGE(*lmmp, mea_size); if (*lmmp == NULL) RETURN(-ENOMEM); } @@ -2686,14 +2824,14 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, return mea_size; if (*lsmp != NULL && lmm == NULL) { - OBD_FREE(*tmea, mea_size); + OBD_FREE_LARGE(*tmea, mea_size); *lsmp = NULL; RETURN(0); } LASSERT(mea_size == lmm_size); - OBD_ALLOC(*tmea, mea_size); + OBD_ALLOC_LARGE(*tmea, mea_size); if (*tmea == NULL) RETURN(-ENOMEM); @@ -2726,7 +2864,7 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, ldlm_policy_data_t *policy, ldlm_mode_t mode, - int flags, void *opaque) + ldlm_cancel_flags_t flags, void *opaque) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2749,13 +2887,15 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) +int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, + __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; int rc; ENTRY; - rc = md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data); + + rc = md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits); RETURN(rc); } @@ -2906,7 +3046,7 @@ int lmv_intent_getattr_async(struct obd_export *exp, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_object *obj; - struct lmv_tgt_desc *tgt; + struct lmv_tgt_desc *tgt = NULL; int rc; int sidx; ENTRY; @@ -2915,36 +3055,21 @@ int lmv_intent_getattr_async(struct obd_export *exp, if (rc) RETURN(rc); - if (!fid_is_sane(&op_data->op_fid2)) { + if (op_data->op_namelen) { obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj && op_data->op_namelen) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, + if (obj) { + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)op_data->op_name, op_data->op_namelen); op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, - obj->lo_stripes[sidx].ls_mds); - CDEBUG(D_INODE, - "Choose slave dir ("DFID") -> mds #%d\n", - PFID(&op_data->op_fid1), tgt->ltd_idx); - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - } - if (obj) + tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); lmv_object_put(obj); - } else { - op_data->op_fid1 = op_data->op_fid2; - tgt = lmv_find_target(lmv, &op_data->op_fid2); - op_data->op_bias = MDS_CROSS_REF; - /* - * Unfortunately, we have to lie to MDC/MDS to retrieve - * attributes llite needs. - */ - if (minfo->mi_it.it_op & IT_LOOKUP) - minfo->mi_it.it_op = IT_GETATTR; + } } + if (tgt == NULL) + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2952,9 +3077,8 @@ int lmv_intent_getattr_async(struct obd_export *exp, RETURN(rc); } -int lmv_revalidate_lock(struct obd_export *exp, - struct lookup_intent *it, - struct lu_fid *fid) +int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, + struct lu_fid *fid, __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2970,10 +3094,87 @@ int lmv_revalidate_lock(struct obd_export *exp, if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - rc = md_revalidate_lock(tgt->ltd_exp, it, fid); + rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits); + RETURN(rc); +} + +/** + * For lmv, only need to send request to master MDT, and the master MDT will + * process with other slave MDTs. The only exception is Q_GETOQUOTA for which + * we directly fetch data from the slave MDTs. + */ +int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = &lmv->tgts[0]; + int rc = 0, i; + __u64 curspace, curinodes; + ENTRY; + + if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) { + CERROR("master lmv inactive\n"); + RETURN(-EIO); + } + + if (oqctl->qc_cmd != Q_GETOQUOTA) { + rc = obd_quotactl(tgt->ltd_exp, oqctl); + RETURN(rc); + } + + curspace = curinodes = 0; + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + int err; + tgt = &lmv->tgts[i]; + + if (tgt->ltd_exp == NULL) + continue; + if (!tgt->ltd_active) { + CDEBUG(D_HA, "mdt %d is inactive.\n", i); + continue; + } + + err = obd_quotactl(tgt->ltd_exp, oqctl); + if (err) { + CERROR("getquota on mdt %d failed. %d\n", i, err); + if (!rc) + rc = err; + } else { + curspace += oqctl->qc_dqblk.dqb_curspace; + curinodes += oqctl->qc_dqblk.dqb_curinodes; + } + } + oqctl->qc_dqblk.dqb_curspace = curspace; + oqctl->qc_dqblk.dqb_curinodes = curinodes; + RETURN(rc); } +int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int i, rc = 0; + ENTRY; + + for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) { + int err; + + if (!tgt->ltd_active) { + CERROR("lmv idx %d inactive\n", i); + RETURN(-EIO); + } + + err = obd_quotacheck(tgt->ltd_exp, oqctl); + if (err && !rc) + rc = err; + } + + RETURN(rc); +} struct obd_ops lmv_obd_ops = { .o_owner = THIS_MODULE, @@ -2991,12 +3192,15 @@ struct obd_ops lmv_obd_ops = { .o_notify = lmv_notify, .o_get_uuid = lmv_get_uuid, .o_iocontrol = lmv_iocontrol, - .o_fid_delete = lmv_fid_delete + .o_fid_delete = lmv_fid_delete, + .o_quotacheck = lmv_quotacheck, + .o_quotactl = lmv_quotactl }; struct md_ops lmv_md_ops = { .m_getstatus = lmv_getstatus, .m_change_cbdata = lmv_change_cbdata, + .m_find_cbdata = lmv_find_cbdata, .m_close = lmv_close, .m_create = lmv_create, .m_done_writing = lmv_done_writing, @@ -3027,9 +3231,6 @@ struct md_ops lmv_md_ops = { .m_revalidate_lock = lmv_revalidate_lock }; -static quota_interface_t *quota_interface; -extern quota_interface_t lmv_quota_interface; - int __init lmv_init(void) { struct lprocfs_static_vars lvars; @@ -3045,17 +3246,10 @@ int __init lmv_init(void) lprocfs_lmv_init_vars(&lvars); - request_module("lquota"); - quota_interface = PORTAL_SYMBOL_GET(lmv_quota_interface); - init_obd_quota_ops(quota_interface, &lmv_obd_ops); - rc = class_register_type(&lmv_obd_ops, &lmv_md_ops, lvars.module_vars, LUSTRE_LMV_NAME, NULL); - if (rc) { - if (quota_interface) - PORTAL_SYMBOL_PUT(lmv_quota_interface); + if (rc) cfs_mem_cache_destroy(lmv_object_cache); - } return rc; } @@ -3063,14 +3257,11 @@ int __init lmv_init(void) #ifdef __KERNEL__ static void lmv_exit(void) { - if (quota_interface) - PORTAL_SYMBOL_PUT(lmv_quota_interface); - class_unregister_type(LUSTRE_LMV_NAME); - LASSERTF(atomic_read(&lmv_object_count) == 0, + LASSERTF(cfs_atomic_read(&lmv_object_count) == 0, "Can't free lmv objects cache, %d object(s) busy\n", - atomic_read(&lmv_object_count)); + cfs_atomic_read(&lmv_object_count)); cfs_mem_cache_destroy(lmv_object_cache); }