X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_intent.c;h=14f68b6b26254440791d2d6fefc3a400744c42ce;hb=f50d3d56f80184bc21f729e1640422d6a2a31158;hp=8726bdab0bb40bd139ad37f7ca5dc2084e989be6;hpb=386f2e3a877e214eecd69130a45b45d7570260a7;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 8726bda..14f68b6 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -1,7 +1,7 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc. * * This file is part of Lustre, http://www.lustre.org. * @@ -31,316 +31,502 @@ #include #include #include +#include #else #include #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include #include "lmv_internal.h" - static inline void lmv_drop_intent_lock(struct lookup_intent *it) { - if (it->d.lustre.it_lock_mode != 0) + if (it->d.lustre.it_lock_mode != 0) { ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, it->d.lustre.it_lock_mode); + it->d.lustre.it_lock_mode = 0; + } } -int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, - void *lmm, int lmmsize, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +int lmv_intent_remote(struct obd_export *exp, void *lmm, + int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *body = NULL; - int rc = 0; + struct ptlrpc_request *req = NULL; + struct lustre_handle plock; + struct md_op_data *op_data; + struct obd_export *tgt_exp; + struct mdt_body *body; + int pmode, rc = 0; ENTRY; - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); + + if (!(body->valid & OBD_MD_MDS)) + RETURN(0); + + /* + * oh, MDS reports that this is remote inode case i.e. we have to ask + * for real attrs on another MDS. + */ + if (it->it_op & IT_LOOKUP) { + /* + * unfortunately, we have to lie to MDC/MDS to retrieve + * attributes llite needs. + */ + it->it_op = IT_GETATTR; + } - if (body->valid & OBD_MD_MDS) { - /* oh, MDS reports that this is remote inode case - * i.e. we have to ask for real attrs on another MDS */ - struct ptlrpc_request *req; - struct ll_fid nfid; - struct lustre_handle plock; - int pmode; - - if (it->it_op == IT_LOOKUP) { - /* unfortunately, we have to lie to MDC/MDS to - * retrieve attributes llite needs */ - it->it_op = IT_GETATTR; - } + /* we got LOOKUP lock, but we really need attrs */ + pmode = it->d.lustre.it_lock_mode; + if (pmode) { + plock.cookie = it->d.lustre.it_lock_handle; + it->d.lustre.it_lock_mode = 0; + it->d.lustre.it_data = 0; + } - /* we got LOOKUP lock, but we really need attrs */ - pmode = it->d.lustre.it_lock_mode; - if (pmode) { - memcpy(&plock, &it->d.lustre.it_lock_handle, - sizeof(plock)); - it->d.lustre.it_lock_mode = 0; - } + LASSERT(fid_is_sane(&body->fid1)); - nfid = body->fid1; - it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - rc = md_intent_lock(lmv->tgts[nfid.mds].ltd_exp, uctxt, &nfid, - NULL, 0, lmm, lmmsize, NULL, it, flags, - &req, cb_blocking); + it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - /* llite needs LOOKUP lock to track dentry revocation in - * order to maintain dcache consistency. thus drop UPDATE - * lock here and put LOOKUP in request */ - if (rc == 0) { - lmv_drop_intent_lock(it); - memcpy(&it->d.lustre.it_lock_handle, &plock, - sizeof(plock)); - it->d.lustre.it_lock_mode = pmode; - - } else if (pmode) - ldlm_lock_decref(&plock, pmode); + tgt_exp = lmv_find_export(lmv, &body->fid1); + if (IS_ERR(tgt_exp)) + GOTO(out, rc = PTR_ERR(tgt_exp)); - ptlrpc_req_finished(*reqp); - *reqp = req; + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + GOTO(out, rc = -ENOMEM); + + op_data->op_fid1 = body->fid1; + op_data->op_bias = MDS_CROSS_REF; + + rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags, + &req, cb_blocking, extra_lock_flags); + + /* + * llite needs LOOKUP lock to track dentry revocation in order to + * maintain dcache consistency. Thus drop UPDATE lock here and put + * LOOKUP in request. + */ + if (rc == 0) { + lmv_drop_intent_lock(it); + it->d.lustre.it_lock_handle = plock.cookie; + it->d.lustre.it_lock_mode = pmode; + } + + OBD_FREE_PTR(op_data); + EXIT; +out: + if (rc && pmode) + ldlm_lock_decref(&plock, pmode); + + ptlrpc_req_finished(*reqp); + *reqp = req; + return rc; +} + +int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid, + struct md_op_data *op, struct lu_fid *fid) +{ + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_obj *obj; + mdsno_t mds; + int mea_idx; + int rc; + ENTRY; + + obj = lmv_obj_grab(obd, pid); + if (!obj) { + CERROR("Object "DFID" should be split\n", + PFID(pid)); + RETURN(0); } + + mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + (char *)op->op_name, op->op_namelen); + mds = obj->lo_inodes[mea_idx].li_mds; + lmv_obj_put(obj); + + rc = __lmv_fid_alloc(lmv, fid, mds); + if (rc) { + CERROR("Can't allocate new fid, rc %d\n", + rc); + RETURN(rc); + } + + CDEBUG(D_INFO, "Allocate new fid "DFID" for split " + "obj\n", PFID(fid)); + RETURN(rc); } -int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +/* + * IT_OPEN is intended to open (and create, possible) an object. Parent (pid) + * may be split dir. + */ +int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, + void *lmm, int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; + struct lu_fid rpid = op_data->op_fid1; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; + struct md_op_data *sop_data; + struct obd_export *tgt_exp; + struct lmv_stripe_md *mea; + struct mdt_body *body; struct lmv_obj *obj; - struct mea *mea; - int rc, mds; + int rc, loop = 0; ENTRY; - /* IT_OPEN is intended to open (and create, possible) an object. Parent - * (pfid) may be splitted dir */ + OBD_ALLOC_PTR(sop_data); + if (sop_data == NULL) + RETURN(-ENOMEM); + + /* save op_data fro repeat case */ + *sop_data = *op_data; repeat: - mds = rpfid.mds; - obj = lmv_grab_obj(obd, &rpfid); + + ++loop; + LASSERT(loop <= 2); + obj = lmv_obj_grab(obd, &rpid); if (obj) { - /* directory is already splitted, so we have to forward - * request to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)name, len); - CDEBUG(D_OTHER, "forward to MDS #%u\n", mds); + int mea_idx; + + /* + * Directory is already split, so we have to forward request to + * the right MDS. + */ + mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + + rpid = obj->lo_inodes[mea_idx].li_fid; + + sop_data->op_mds = obj->lo_inodes[mea_idx].li_mds; + tgt_exp = lmv_get_export(lmv, sop_data->op_mds); + sop_data->op_bias &= ~MDS_CHECK_SPLIT; + lmv_obj_put(obj); + CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid)); + } else { + struct lmv_tgt_desc *tgt; + + sop_data->op_bias |= MDS_CHECK_SPLIT; + tgt = lmv_find_target(lmv, &rpid); + sop_data->op_mds = tgt->ltd_idx; + tgt_exp = tgt->ltd_exp; + } + if (IS_ERR(tgt_exp)) + GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp)); + + sop_data->op_fid1 = rpid; + + if (it->it_op & IT_CREAT) { + /* + * For open with IT_CREATE and for IT_CREATE cases allocate new + * fid and setup FLD for it. + */ + rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data); + if (rc) + GOTO(out_free_sop_data, rc); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); + if (rc == -ERESTART) + goto repeat; + else if (rc) + GOTO(out_free_sop_data, rc); } - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, cfid, it, flags, reqp, - cb_blocking); + rc = md_intent_lock(tgt_exp, sop_data, lmm, lmmsize, it, flags, + reqp, cb_blocking, extra_lock_flags); + if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ - LASSERT(fid_equal(pfid, &rpfid)); - rc = lmv_get_mea_and_update_object(exp, &rpfid); + LASSERT(*reqp != NULL); + DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, + "Got -ERESTART during open!\n"); + ptlrpc_req_finished(*reqp); + *reqp = NULL; + it->d.lustre.it_data = 0; + + /* + * Directory got split. Time to update local object and repeat + * the request with proper MDS. + */ + LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid)); + rc = lmv_handle_split(exp, &rpid); if (rc == 0) { - ptlrpc_req_finished(*reqp); - goto repeat; + /* We should reallocate child FID. */ + rc = lmv_alloc_slave_fids(obd, &rpid, op_data, + &sop_data->op_fid2); + if (rc == 0) + goto repeat; } } - if (rc != 0) - RETURN(rc); - /* okay, MDS has returned success. Probably name has been resolved in - * remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, - flags, reqp, cb_blocking); + if (rc != 0) + GOTO(out_free_sop_data, rc); + + /* + * Okay, MDS has returned success. Probably name has been resolved in + * remote inode. + */ + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, + cb_blocking, extra_lock_flags); if (rc != 0) { LASSERT(rc < 0); - RETURN(rc); + /* + * This is possible, that some userspace application will try to + * open file as directory and we will have -ENOTDIR here. As + * this is normal situation, we should not print error here, + * only debug info. + */ + CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):" + "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2), + PFID(&rpid), op_data->op_namelen, op_data->op_name, rc); + GOTO(out_free_sop_data, rc); } + /* + * Nothing is found, do not access body->fid1 as it is zero and thus + * pointless. + */ + if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) && + !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) && + !(it->d.lustre.it_disposition & DISP_OPEN_OPEN)) + GOTO(out_free_sop_data, rc = 0); + /* caller may use attrs MDS returns on IT_OPEN lock request so, we have - * to update them for splitted dir */ - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); + * to update them for split dir */ + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); - cfid = &body->fid1; - obj = lmv_grab_obj(obd, cfid); - if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) { - /* wow! this is splitted dir, we'd like to handle it */ - obj = lmv_create_obj(exp, &body->fid1, mea); + /* could not find object, FID is not present in response. */ + if (!(body->valid & OBD_MD_FLID)) + GOTO(out_free_sop_data, rc = 0); + + obj = lmv_obj_grab(obd, &body->fid1); + if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) { + /* FIXME: capability for remote! */ + /* wow! this is split dir, we'd like to handle it */ + obj = lmv_obj_create(exp, &body->fid1, mea); if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); + GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); } if (obj) { - /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation); - rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + /* This is split dir and we'd want to get attrs. */ + CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n", + PFID(&body->fid1)); + + rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, + cb_blocking, extra_lock_flags); } else if (S_ISDIR(body->mode)) { - /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation);*/ + CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n", + PFID(&body->fid1)); } - + if (obj) - lmv_put_obj(obj); - - RETURN(rc); + lmv_obj_put(obj); + + EXIT; +out_free_sop_data: + OBD_FREE_PTR(sop_data); + return rc; } -int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, + void *lmm, int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { + struct lmv_obj *obj = NULL, *obj2 = NULL; struct obd_device *obd = exp->exp_obd; + struct lu_fid rpid = op_data->op_fid1; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; - struct lmv_obj *obj, *obj2; - struct mea *mea; - int rc = 0, mds; + struct md_op_data *sop_data; + struct lmv_stripe_md *mea; + struct mdt_body *body; + mdsno_t mds; + int rc = 0; ENTRY; - if (cfid) { - /* caller wants to revalidate attrs of obj we have to revalidate - * slaves if requested object is splitted directory */ - CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation); - mds = cfid->mds; - obj = lmv_grab_obj(obd, cfid); + OBD_ALLOC_PTR(sop_data); + if (sop_data == NULL) + RETURN(-ENOMEM); + + /* save op_data fro repeat case */ + *sop_data = *op_data; + + if (fid_is_sane(&op_data->op_fid2)) { + /* + * Caller wants to revalidate attrs of obj we have to revalidate + * slaves if requested object is split directory. + */ + CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", + PFID(&op_data->op_fid2)); + + rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds); + if (rc) + GOTO(out_free_sop_data, rc); +#if 0 + /* + * In fact, we do not need this with current intent_lock(), but + * it may change some day. + */ + obj = lmv_obj_grab(obd, &op_data->op_fid2); if (obj) { - /* in fact, we need not this with current intent_lock(), - * but it may change some day */ - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); + if (!lu_fid_eq(&op_data->op_fid1, &op_data->op_fid2)){ + rpid = obj->lo_inodes[mds].li_fid; + mds = obj->lo_inodes[mds].li_mds; + } + lmv_obj_put(obj); } - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, cfid, it, flags, reqp, - cb_blocking); - if (obj && rc >= 0) { - /* this is splitted dir. In order to optimize things a - * bit, we consider obj valid updating missing parts. - - * FIXME: do we need to return any lock here? It would - * be fine if we don't. this means that nobody should - * use UPDATE lock to notify about object * removal */ - CDEBUG(D_OTHER, - "revalidate slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, rc); - - rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc, - cb_blocking); +#endif + } else { + CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n", + op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1)); + + rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); + if (rc) + GOTO(out_free_sop_data, rc); + obj = lmv_obj_grab(obd, &op_data->op_fid1); + if (obj && op_data->op_namelen) { + int mea_idx; + + /* directory is already split. calculate mds */ + mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + rpid = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; + sop_data->op_bias &= ~MDS_CHECK_SPLIT; + lmv_obj_put(obj); + + CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n", + mds, PFID(&rpid)); + } else { + rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); + if (rc) + GOTO(out_free_sop_data, rc); + sop_data->op_bias |= MDS_CHECK_SPLIT; } - - RETURN(rc); } - CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n", - len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id, - (unsigned long)pfid->generation); - - mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid); - if (obj && len) { - /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *) name, len); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); - - CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n", - mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); - } - - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, NULL, it, flags, reqp, - cb_blocking); - + sop_data->op_fid1 = rpid; + + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, + lmmsize, it, flags, reqp, cb_blocking, + extra_lock_flags); + + LASSERTF(rc != -ERESTART, "GETATTR: Got unhandled -ERESTART!\n"); if (rc < 0) - RETURN(rc); - - LASSERT(rc == 0); + GOTO(out_free_sop_data, rc); + + if (obj && rc > 0) { + /* + * This is split dir. In order to optimize things a bit, we + * consider obj valid updating missing parts. + + * FIXME: do we need to return any lock here? It would be fine + * if we don't. This means that nobody should use UPDATE lock to + * notify about object * removal. + */ + CDEBUG(D_OTHER, + "revalidate slaves for "DFID", rc %d\n", + PFID(&op_data->op_fid2), rc); + + LASSERT(fid_is_sane(&op_data->op_fid2)); + rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid2, it, rc, + cb_blocking, extra_lock_flags); + GOTO(out_free_sop_data, rc); + } - /* okay, MDS has returned success. probably name has been - * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, - flags, reqp, cb_blocking); + if (*reqp == NULL) + GOTO(out_free_sop_data, rc); + + /* + * okay, MDS has returned success. Probably name has been resolved in + * remote inode. + */ + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, + reqp, cb_blocking, extra_lock_flags); if (rc < 0) - RETURN(rc); + GOTO(out_free_sop_data, rc); - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); + /* + * Nothing is found, do not access body->fid1 as it is zero and thus + * pointless. + */ + if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG) + GOTO(out_free_sop_data, rc = 0); + + LASSERT(*reqp); + LASSERT((*reqp)->rq_repmsg); + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); - - cfid = &body->fid1; - obj2 = lmv_grab_obj(obd, cfid); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); - if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) { - /* wow! this is splitted dir, we'd like to handle it. */ - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); - LASSERT(body != NULL); + /* could not find object, FID is not present in response. */ + if (!(body->valid & OBD_MD_FLID)) + GOTO(out_free_sop_data, rc = 0); + + obj2 = lmv_obj_grab(obd, &body->fid1); + + if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) { - obj2 = lmv_create_obj(exp, &body->fid1, mea); + /* FIXME remote capability! */ + /* wow! this is split dir, we'd like to handle it. */ + obj2 = lmv_obj_create(exp, &body->fid1, mea); if (IS_ERR(obj2)) - RETURN(PTR_ERR(obj2)); + GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2)); } if (obj2) { - /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, rc); - - rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking); - lmv_put_obj(obj2); - } - RETURN(rc); -} + /* this is split dir and we'd want to get attrs */ + CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n", + PFID(&body->fid1), rc); -void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj) -{ - /* update size */ - body->size += obj->size; + rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, + cb_blocking, extra_lock_flags); + lmv_obj_put(obj2); + } - /* update atime */ - /* update ctime */ - /* update mtime */ - /* update nlink */ + EXIT; +out_free_sop_data: + OBD_FREE_PTR(sop_data); + return rc; } +/* this is not used currently */ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *body = NULL; struct lustre_handle *lockh; + struct md_op_data *op_data; struct ldlm_lock *lock; - struct mds_body *body2; - struct ll_uctxt uctxt; + struct mdt_body *body2; + struct mdt_body *body; struct lmv_obj *obj; int i, rc = 0; ENTRY; @@ -348,326 +534,390 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) LASSERT(reqp); LASSERT(*reqp); - /* master is locked. we'd like to take locks on slaves and update + /* + * Master is locked. we'd like to take locks on slaves and update * attributes to be returned from the slaves it's important that lookup * is called in two cases: - - * - for first time (dcache has no such a resolving yet). - * - ->d_revalidate() returned false. - - * last case possible only if all the objs (master and all slaves aren't - * valid */ - - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); + + * - for first time (dcache has no such a resolving yet). - + * ->d_revalidate() returned false. + + * Last case possible only if all the objs (master and all slaves aren't + * valid. + */ + + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + RETURN(-ENOMEM); + + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); - obj = lmv_grab_obj(obd, &body->fid1); + LASSERT((body->valid & OBD_MD_FLID) != 0); + obj = lmv_obj_grab(obd, &body->fid1); LASSERT(obj != NULL); - CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n", - (unsigned long)body->fid1.mds, - (unsigned long)body->fid1.id, - (unsigned long)body->fid1.generation); + CDEBUG(D_OTHER, "lookup slaves for "DFID"\n", + PFID(&body->fid1)); - uctxt.gid1 = 0; - uctxt.gid2 = 0; + lmv_obj_lock(obj); - lmv_lock_obj(obj); - - for (i = 0; i < obj->objcount; i++) { - struct ll_fid fid = obj->objs[i].fid; + for (i = 0; i < obj->lo_objcount; i++) { + struct lu_fid fid = obj->lo_inodes[i].li_fid; struct ptlrpc_request *req = NULL; + struct obd_export *tgt_exp; struct lookup_intent it; - if (fid_equal(&fid, &obj->fid)) + if (lu_fid_eq(&fid, &obj->lo_fid)) /* skip master obj */ continue; - CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n", - (unsigned long)fid.mds, (unsigned long)fid.id, - (unsigned long)fid.generation); + CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid)); /* is obj valid? */ memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, &fid, &it, 0, &req, - lmv_dirobj_blocking_ast); - + + memset(op_data, 0, sizeof(*op_data)); + op_data->op_fid1 = fid; + op_data->op_fid2 = fid; + op_data->op_bias = MDS_CROSS_REF; + + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); + if (IS_ERR(tgt_exp)) + GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + + rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, + &req, lmv_blocking_ast, 0); + lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; - if (rc > 0) { + if (rc > 0 && req == NULL) { /* nice, this slave is valid */ LASSERT(req == NULL); CDEBUG(D_OTHER, "cached\n"); goto release_lock; } - if (rc < 0) - /* error during revalidation */ - GOTO(cleanup, rc); - - /* rc == 0, this means we have no such a lock and can't think - * obj is still valid. lookup it again */ - LASSERT(req == NULL); - req = NULL; - - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, NULL, &it, 0, &req, - lmv_dirobj_blocking_ast); - - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - LASSERT(rc <= 0); - - if (rc < 0) + if (rc < 0) { /* error during lookup */ GOTO(cleanup, rc); - + } lock = ldlm_handle2lock(lockh); LASSERT(lock); - lock->l_ast_data = lmv_get_obj(obj); + lock->l_ast_data = lmv_obj_get(obj); + + body2 = lustre_msg_buf(req->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body2)); + LASSERT(body2 != NULL); + LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF)); - body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2)); - LASSERT(body2); + obj->lo_inodes[i].li_size = body2->size; - obj->objs[i].size = body2->size; - CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long)obj->objs[i].size); + (unsigned long)obj->lo_inodes[i].li_size); LDLM_LOCK_PUT(lock); if (req) ptlrpc_req_finished(req); release_lock: - lmv_update_body_from_obj(body, obj->objs + i); + lmv_update_body(body, obj->lo_inodes + i); - if (it.d.lustre.it_lock_mode) + if (it.d.lustre.it_lock_mode) { ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); + it.d.lustre.it_lock_mode = 0; + } } + + EXIT; cleanup: - lmv_unlock_obj(obj); - lmv_put_obj(obj); - RETURN(rc); + lmv_obj_unlock(obj); + lmv_obj_put(obj); + OBD_FREE_PTR(op_data); + return rc; } -int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, + void *lmm, int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; + struct lu_fid rpid = op_data->op_fid1; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; + struct md_op_data *sop_data; + struct lmv_stripe_md *mea; + struct mdt_body *body; struct lmv_obj *obj; - struct mea *mea; - int rc, mds; + int rc, loop = 0; + int mea_idx; + mdsno_t mds; ENTRY; - /* IT_LOOKUP is intended to produce name -> fid resolving (let's call + OBD_ALLOC_PTR(sop_data); + if (sop_data == NULL) + RETURN(-ENOMEM); + + /* save op_data fro repeat case */ + *sop_data = *op_data; + + /* + * IT_LOOKUP is intended to produce name -> fid resolving (let's call * this lookup below) or to confirm requested resolving is still valid - * (let's call this revalidation) cfid != NULL specifies revalidation */ - - if (cfid) { - /* this is revalidation: we have to check is LOOKUP lock still - * valid for given fid. very important part is that we have to - * choose right mds because namespace is per mds */ - rpfid = *pfid; - obj = lmv_grab_obj(obd, pfid); + * (let's call this revalidation) fid_is_sane(&sop_data->op_fid2) specifies + * revalidation. + */ + if (fid_is_sane(&op_data->op_fid2)) { + /* + * This is revalidate: we have to check is LOOKUP lock still + * valid for given fid. Very important part is that we have to + * choose right mds because namespace is per mds. + */ + rpid = op_data->op_fid1; + obj = lmv_obj_grab(obd, &rpid); if (obj) { - mds = raw_name2idx(obj->objcount, (char *) name, len); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); + mea_idx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + rpid = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; + sop_data->op_bias &= ~MDS_CHECK_SPLIT; + lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &rpid, &mds); + if (rc) + GOTO(out_free_sop_data, rc); + sop_data->op_bias |= MDS_CHECK_SPLIT; } - mds = rpfid.mds; - - CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, mds); - - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name, - len, lmm, lmmsize, cfid, it, flags, - reqp, cb_blocking); - RETURN(rc); - } - mds = pfid->mds; + CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n", + PFID(&op_data->op_fid2), mds); + } else { repeat: - /* this is lookup. during lookup we have to update all the attributes, - * because returned values will be put in struct inode */ - - obj = lmv_grab_obj(obd, pfid); - if (obj && len) { - /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *)name, len); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); + ++loop; + LASSERT(loop <= 2); + + /* + * This is lookup. During lookup we have to update all the + * attributes, because returned values will be put in struct + * inode. + */ + obj = lmv_obj_grab(obd, &op_data->op_fid1); + if (obj) { + if (op_data->op_namelen) { + /* directory is already split. calculate mds */ + mea_idx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + rpid = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; + } + sop_data->op_bias &= ~MDS_CHECK_SPLIT; + lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); + if (rc) + GOTO(out_free_sop_data, rc); + sop_data->op_bias |= MDS_CHECK_SPLIT; + } + fid_zero(&sop_data->op_fid2); } - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, NULL, it, flags, reqp, - cb_blocking); + sop_data->op_bias &= ~MDS_CROSS_REF; + sop_data->op_fid1 = rpid; + + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize, + it, flags, reqp, cb_blocking, extra_lock_flags); if (rc > 0) { - /* very interesting. it seems object is still valid but for some - * reason llite calls lookup, not revalidate */ - CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); - + LASSERT(fid_is_sane(&op_data->op_fid2)); + /* + * Very interesting. it seems object is still valid but for some + * reason llite calls lookup, not revalidate. + */ + CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n", + PFID(&rpid)); LASSERT(*reqp == NULL); - RETURN(rc); + GOTO(out_free_sop_data, rc); } if (rc == 0 && *reqp == NULL) { /* once again, we're asked for lookup, not revalidate */ - CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); - RETURN(rc); + CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n", + PFID(&rpid)); + GOTO(out_free_sop_data, rc); } - + if (rc == -ERESTART) { - /* directory got splitted since last update. this shouldn't be - * becasue splitting causes lock revocation, so revalidate had - * to fail and lookup on dir had to return mea */ + LASSERT(*reqp != NULL); + DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, + "Got -ERESTART during lookup!\n"); + ptlrpc_req_finished(*reqp); + *reqp = NULL; + it->d.lustre.it_data = 0; + /* + * Directory got split since last update. This shouldn't be + * because splitting causes lock revocation, so revalidate had + * to fail and lookup on dir had to return mea. + */ CWARN("we haven't knew about directory splitting!\n"); LASSERT(obj == NULL); - obj = lmv_create_obj(exp, &rpfid, NULL); + obj = lmv_obj_create(exp, &rpid, NULL); if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); - + GOTO(out_free_sop_data, rc = PTR_ERR(obj)); + lmv_obj_put(obj); goto repeat; } if (rc < 0) - RETURN(rc); - - /* okay, MDS has returned success. probably name has been resolved in - * remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); - - if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) { - /* wow! this is splitted dir, we'd like to handle it */ - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); + GOTO(out_free_sop_data, rc); + + /* + * Okay, MDS has returned success. Probably name has been resolved in + * remote inode. + */ + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, + cb_blocking, extra_lock_flags); + + if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) { + /* Wow! This is split dir, we'd like to handle it. */ + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); - - obj = lmv_grab_obj(obd, &body->fid1); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); + LASSERT((body->valid & OBD_MD_FLID) != 0); + + obj = lmv_obj_grab(obd, &body->fid1); if (!obj) { - obj = lmv_create_obj(exp, &body->fid1, mea); + obj = lmv_obj_create(exp, &body->fid1, mea); if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); - } else { - lmv_put_obj(obj); + GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); } + lmv_obj_put(obj); } - RETURN(rc); + EXIT; +out_free_sop_data: + OBD_FREE_PTR(sop_data); + return rc; } -int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, + void *lmm, int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; - int rc = 0; + int rc; ENTRY; - LASSERT(it); - LASSERT(pfid); + LASSERT(it != NULL); + LASSERT(fid_is_sane(&op_data->op_fid1)); - CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n", - LL_IT2STR(it), len, name, (unsigned long) pfid->id, - (unsigned long) pfid->generation, pfid->mds); + CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n", + LL_IT2STR(it), op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1)); rc = lmv_check_connect(obd); if (rc) RETURN(rc); - if (it->it_op == IT_LOOKUP) - rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, - cb_blocking); + if (it->it_op & IT_LOOKUP) + rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it, + flags, reqp, cb_blocking, + extra_lock_flags); else if (it->it_op & IT_OPEN) - rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, - cb_blocking); - else if (it->it_op == IT_GETATTR || it->it_op == IT_CHDIR) - rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, - cb_blocking); + rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it, + flags, reqp, cb_blocking, + extra_lock_flags); + else if (it->it_op & IT_GETATTR) + rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it, + flags, reqp, cb_blocking, + extra_lock_flags); else LBUG(); RETURN(rc); } int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, - struct ll_fid *mfid, struct lookup_intent *oit, - int master_valid, ldlm_blocking_callback cb_blocking) + const struct lu_fid *mid, struct lookup_intent *oit, + int master_valid, ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *mreq = *reqp; struct lmv_obd *lmv = &obd->u.lmv; struct lustre_handle master_lockh; + struct obd_export *tgt_exp; + struct md_op_data *op_data; struct ldlm_lock *lock; unsigned long size = 0; - struct mds_body *body; - struct ll_uctxt uctxt; + struct mdt_body *body; struct lmv_obj *obj; int master_lock_mode; int i, rc = 0; ENTRY; - /* we have to loop over the subobjects, check validity and update them + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + RETURN(-ENOMEM); + + /* + * We have to loop over the subobjects, check validity and update them * from MDSs if needed. it's very useful that we need not to update all * the fields. say, common fields (that are equal on all the subojects * need not to be update, another fields (i_size, for example) are - * cached all the time */ - obj = lmv_grab_obj(obd, mfid); + * cached all the time. + */ + obj = lmv_obj_grab(obd, mid); LASSERT(obj != NULL); - uctxt.gid1 = 0; - uctxt.gid2 = 0; master_lock_mode = 0; - lmv_lock_obj(obj); - - for (i = 0; i < obj->objcount; i++) { - struct ll_fid fid = obj->objs[i].fid; + lmv_obj_lock(obj); + + for (i = 0; i < obj->lo_objcount; i++) { + struct lu_fid fid = obj->lo_inodes[i].li_fid; struct lustre_handle *lockh = NULL; struct ptlrpc_request *req = NULL; ldlm_blocking_callback cb; struct lookup_intent it; int master = 0; - CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n", - (unsigned long)fid.mds, (unsigned long)fid.id, - (unsigned long) fid.generation); + CDEBUG(D_OTHER, "revalidate subobj "DFID"\n", + PFID(&fid)); + memset(op_data, 0, sizeof(*op_data)); memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - cb = lmv_dirobj_blocking_ast; - if (fid_equal(&fid, &obj->fid)) { + cb = lmv_blocking_ast; + + if (lu_fid_eq(&fid, &obj->lo_fid)) { if (master_valid) { - /* lmv_intent_getattr() already checked - * validness and took the lock */ + /* + * lmv_intent_getattr() already checked + * validness and took the lock. + */ if (mreq) { - /* it even got the reply refresh attrs - * from that reply */ + /* + * It even got the reply refresh attrs + * from that reply. + */ body = lustre_msg_buf(mreq->rq_repmsg, - 1, sizeof(*body)); + DLM_REPLY_REC_OFF, + sizeof(*body)); LASSERT(body != NULL); - goto update; + LASSERT(lustre_rep_swabbed( + mreq, DLM_REPLY_REC_OFF)); + goto update; } /* take already cached attrs into account */ CDEBUG(D_OTHER, @@ -678,97 +928,98 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, cb = cb_blocking; } - /* is obj valid? */ - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, &fid, &it, 0, &req, cb); - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - if (rc > 0) { - /* nice, this slave is valid */ + op_data->op_fid1 = fid; + op_data->op_fid2 = fid; + op_data->op_bias = MDS_CROSS_REF; + + /* Is obj valid? */ + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); + if (IS_ERR(tgt_exp)) + GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + + rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb, + extra_lock_flags); + + lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; + if (rc > 0 && req == NULL) { + /* Nice, this slave is valid */ LASSERT(req == NULL); CDEBUG(D_OTHER, "cached\n"); goto release_lock; } if (rc < 0) - /* error during revalidation */ GOTO(cleanup, rc); - /* rc == 0, this means we have no such a lock and can't think - * obj is still valid. lookup it again */ - LASSERT(req == NULL); - req = NULL; - - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, NULL, &it, 0, &req, cb); - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - LASSERT(rc <= 0); - - if (rc < 0) - /* error during lookup */ - GOTO(cleanup, rc); - if (master) { LASSERT(master_valid == 0); - /* save lock on master to be returned to the caller */ + /* Save lock on master to be returned to the caller. */ CDEBUG(D_OTHER, "no lock on master yet\n"); memcpy(&master_lockh, lockh, sizeof(master_lockh)); master_lock_mode = it.d.lustre.it_lock_mode; it.d.lustre.it_lock_mode = 0; } else { - /* this is slave. we want to control it */ + /* This is slave. We want to control it. */ lock = ldlm_handle2lock(lockh); - LASSERT(lock); - lock->l_ast_data = lmv_get_obj(obj); + LASSERT(lock != NULL); + lock->l_ast_data = lmv_obj_get(obj); LDLM_LOCK_PUT(lock); } if (*reqp == NULL) { - /* this is first reply, we'll use it to return - * updated data back to the caller */ + /* + * This is first reply, we'll use it to return updated + * data back to the caller. + */ LASSERT(req); ptlrpc_request_addref(req); *reqp = req; - } - body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body)); - LASSERT(body); - + body = lustre_msg_buf(req->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF)); + update: - obj->objs[i].size = body->size; - + obj->lo_inodes[i].li_size = body->size; + CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long)obj->objs[i].size); + (unsigned long)obj->lo_inodes[i].li_size); if (req) ptlrpc_req_finished(req); release_lock: - size += obj->objs[i].size; + size += obj->lo_inodes[i].li_size; - if (it.d.lustre.it_lock_mode) + if (it.d.lustre.it_lock_mode) { ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); + it.d.lustre.it_lock_mode = 0; + } } if (*reqp) { - /* some attrs got refreshed, we have reply and it's time to put - * fresh attrs to it */ + /* + * Some attrs got refreshed, we have reply and it's time to put + * fresh attrs to it. + */ CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n", (unsigned long)size); - - body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); - LASSERT(body); - /* FIXME: what about other attributes? */ + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF)); + body->size = size; - + if (mreq == NULL) { - /* very important to maintain lli->mds the same because + /* + * Very important to maintain mds num the same because * of revalidation. mreq == NULL means that caller has - * no reply and the only attr we can return is size */ + * no reply and the only attr we can return is size. + */ body->valid = OBD_MD_FLSIZE; - body->mds = obj->fid.mds; } if (master_valid == 0) { memcpy(&oit->d.lustre.it_lock_handle, @@ -777,14 +1028,17 @@ release_lock: } rc = 0; } else { - /* it seems all the attrs are fresh and we did no request */ + /* It seems all the attrs are fresh and we did no request */ CDEBUG(D_OTHER, "all the attrs were fresh\n"); if (master_valid == 0) oit->d.lustre.it_lock_mode = master_lock_mode; rc = 1; } + + EXIT; cleanup: - lmv_unlock_obj(obj); - lmv_put_obj(obj); - RETURN(rc); + OBD_FREE_PTR(op_data); + lmv_obj_unlock(obj); + lmv_obj_put(obj); + return rc; }