From 46aaad58c8519994129ef484a4b6945e4c253b19 Mon Sep 17 00:00:00 2001 From: yury Date: Sat, 19 Jun 2004 17:15:26 +0000 Subject: [PATCH] - added mutex to lmv_obj to protect slaves attrs. - changed semantic of lmv_create_obj(). By now it returns created object, as mostly this is what all expect from it. --- lustre/lmv/lmv_intent.c | 79 +++++++++------- lustre/lmv/lmv_internal.h | 72 ++++++++------ lustre/lmv/lmv_obd.c | 233 ++++++++++++++++++++++++++-------------------- lustre/lmv/lmv_objmgr.c | 38 ++++---- 4 files changed, 243 insertions(+), 179 deletions(-) diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 3925c8a..bf3f7b3 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -181,13 +181,11 @@ repeat: cfid = &body->fid1; obj = lmv_grab_obj(obd, cfid); - if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) { + if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ - rc = lmv_create_obj(exp, &body->fid1, mea); - if (rc) - RETURN(rc); - - obj = lmv_grab_obj(obd, cfid); + obj = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); } if (obj) { @@ -301,16 +299,14 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt, cfid = &body->fid1; obj2 = lmv_grab_obj(obd, cfid); - if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) { + if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it. */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - rc = lmv_create_obj(exp, &body->fid1, mea); - if (rc) - RETURN(rc); - - obj2 = lmv_grab_obj(obd, cfid); + obj2 = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj2)) + RETURN(PTR_ERR(obj2)); } if (obj2) { @@ -366,7 +362,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) LASSERT(body != NULL); obj = lmv_grab_obj(obd, &body->fid1); - LASSERT(obj); + LASSERT(obj != NULL); CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n", (unsigned long)body->fid1.mds, @@ -375,6 +371,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) uctxt.gid1 = 0; uctxt.gid2 = 0; + + lmv_lock_obj(obj); for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; @@ -395,6 +393,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, &fid, &it, 0, &req, lmv_dirobj_blocking_ast); + lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; if (rc > 0) { /* nice, this slave is valid */ @@ -411,6 +410,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; + memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, @@ -434,7 +434,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) obj->objs[i].size = body2->size; CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long) obj->objs[i].size); + (unsigned long)obj->objs[i].size); LDLM_LOCK_PUT(lock); @@ -442,12 +442,13 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) ptlrpc_req_finished(req); release_lock: lmv_update_body_from_obj(body, obj->objs + i); + if (it.d.lustre.it_lock_mode) ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); } cleanup: - if (obj) - lmv_put_obj(obj); + lmv_unlock_obj(obj); + lmv_put_obj(obj); RETURN(rc); } @@ -530,34 +531,40 @@ repeat: } if (rc == -ERESTART) { - /* directory got splitted since last update. this shouldn't - * be becasue splitting causes lock revocation, so revalidate - * had to fail and lookup on dir had to return mea */ + /* directory got splitted since last update. this shouldn't be + * becasue splitting causes lock revocation, so revalidate had + * to fail and lookup on dir had to return mea */ CWARN("we haven't knew about directory splitting!\n"); LASSERT(obj == NULL); - rc = lmv_create_obj(exp, &rpfid, NULL); - if (rc) - RETURN(rc); + + obj = lmv_create_obj(exp, &rpfid, NULL); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + goto repeat; } if (rc < 0) RETURN(rc); - /* okay, MDS has returned success. probably name has been - * resolved in remote inode */ + /* okay, MDS has returned success. probably name has been resolved in + * remote inode */ rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, reqp, cb_blocking); - if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) { + if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - if ((obj = lmv_grab_obj(obd, &body->fid1))) + obj = lmv_grab_obj(obd, &body->fid1); + if (!obj) { + obj = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + } else { lmv_put_obj(obj); - else - rc = lmv_create_obj(exp, &body->fid1, mea); + } } RETURN(rc); @@ -625,11 +632,13 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, * need not to be update, another fields (i_size, for example) are * cached all the time */ obj = lmv_grab_obj(obd, mfid); - LASSERT(obj); + LASSERT(obj != NULL); uctxt.gid1 = 0; uctxt.gid2 = 0; master_lock_mode = 0; + + lmv_lock_obj(obj); for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; @@ -687,12 +696,14 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; + memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, NULL, &it, 0, &req, cb); lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; LASSERT(rc <= 0); + if (rc < 0) /* error during lookup */ GOTO(cleanup, rc); @@ -727,7 +738,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, update: obj->objs[i].size = body->size; CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long) obj->objs[i].size); + (unsigned long)obj->objs[i].size); if (req) ptlrpc_req_finished(req); @@ -738,8 +749,8 @@ release_lock: } if (*reqp) { - /* some attrs got refreshed, we have reply and it's time - * to put fresh attrs to it */ + /* some attrs got refreshed, we have reply and it's time to put + * fresh attrs to it */ CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n", (unsigned long) size); body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); @@ -768,7 +779,7 @@ release_lock: rc = 1; } cleanup: - if (obj) - lmv_put_obj(obj); + lmv_unlock_obj(obj); + lmv_put_obj(obj); RETURN(rc); } diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 382df09..d4623db 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -1,48 +1,67 @@ #ifndef _LMV_INTERNAL_H_ #define _LMV_INTERNAL_H_ -#define LL_IT2STR(it) ((it) ? ldlm_it2str((it)->it_op) : "0") -#define MEA_SIZE_LMV(lmv) \ - ((lmv)->desc.ld_tgt_count * sizeof(struct ll_fid) + sizeof(struct mea)) +#define LL_IT2STR(it) \ + ((it) ? ldlm_it2str((it)->it_op) : "0") + +#define MEA_SIZE_LMV(lmv) \ + ((lmv)->desc.ld_tgt_count * \ + sizeof(struct ll_fid) + sizeof(struct mea)) struct lmv_inode { - struct ll_fid fid; /* fid of dirobj */ - unsigned long size; - int flags; + struct ll_fid fid; /* fid of dirobj */ + unsigned long size; + int flags; }; struct lmv_obj { - struct list_head list; - struct semaphore guard; - int freeing; /* object ig freeing. */ - atomic_t count; - struct ll_fid fid; /* master fid of dir */ - void *update; /* bitmap of status (uptodate) */ - int objcount; - struct lmv_inode *objs; /* array of dirobjs */ - struct obd_device *obd; /* pointer to LMV itself */ + struct list_head list; + struct semaphore guard; + int freeing; /* object is freeing. */ + atomic_t count; + struct ll_fid fid; /* master fid of dir */ + void *update; /* bitmap of status (uptodate) */ + int objcount; + struct lmv_inode *objs; /* array of dirobjs */ + struct obd_device *obd; /* pointer to LMV itself */ }; +static inline void +lmv_lock_obj(struct lmv_obj *obj) +{ + LASSERT(obj); + down(&obj->guard); +} + +static inline void +lmv_unlock_obj(struct lmv_obj *obj) +{ + LASSERT(obj); + up(&obj->guard); +} + +void lmv_add_obj(struct lmv_obj *obj); +void lmv_del_obj(struct lmv_obj *obj); + +void lmv_put_obj(struct lmv_obj *obj); +void lmv_free_obj(struct lmv_obj *obj); + int lmv_setup_mgr(struct obd_device *obd); void lmv_cleanup_mgr(struct obd_device *obd); int lmv_check_connect(struct obd_device *obd); -void lmv_put_obj(struct lmv_obj *obj); struct lmv_obj *lmv_get_obj(struct lmv_obj *obj); struct lmv_obj *lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid); -void lmv_free_obj(struct lmv_obj *obj); -void lmv_add_obj(struct lmv_obj *obj); -void lmv_del_obj(struct lmv_obj *obj); - struct lmv_obj *lmv_alloc_obj(struct obd_device *obd, struct ll_fid *fid, struct mea *mea); -int lmv_create_obj(struct obd_export *exp, struct ll_fid *fid, - struct mea *mea); +struct lmv_obj *lmv_create_obj(struct obd_export *exp, + struct ll_fid *fid, + struct mea *mea); int lmv_destroy_obj(struct obd_export *exp, struct ll_fid *fid); @@ -71,12 +90,11 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, ldlm_blocking_callback cb_blocking); int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *); - int lmv_dirobj_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *, int); static inline struct mea * -is_body_of_splitted_dir(struct ptlrpc_request *req, int offset) +body_of_splitted_dir(struct ptlrpc_request *req, int offset) { struct mds_body *body; struct mea *mea; @@ -88,7 +106,8 @@ is_body_of_splitted_dir(struct ptlrpc_request *req, int offset) if (!body || !S_ISDIR(body->mode) || !body->eadatasize) return NULL; - mea = lustre_msg_buf(req->rq_repmsg, offset + 1, body->eadatasize); + mea = lustre_msg_buf(req->rq_repmsg, + offset + 1, body->eadatasize); LASSERT(mea); if (mea->mea_count == 0) @@ -97,7 +116,8 @@ is_body_of_splitted_dir(struct ptlrpc_request *req, int offset) return mea; } -static inline int fid_equal(struct ll_fid *fid1, struct ll_fid *fid2) +static inline int +fid_equal(struct ll_fid *fid1, struct ll_fid *fid2) { if (fid1->mds != fid2->mds) return 0; diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 7ef1329..681893d 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -544,23 +544,25 @@ static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); + LASSERT(i < lmv->desc.ld_tgt_count); + + rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid, + ea_size, request); + if (rc) + RETURN(rc); + obj = lmv_grab_obj(obd, fid); CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n", (unsigned long)fid->mds, (unsigned long)fid->id, (unsigned long)fid->generation, obj ? "(splitted)" : ""); - LASSERT(fid->mds < lmv->desc.ld_tgt_count); - rc = md_getattr(lmv->tgts[i].ltd_exp, fid, - valid, ea_size, request); - if (rc == 0 && obj) { + if (obj) { /* we have to loop over dirobjs here and gather attrs for all * the slaves. */ #warning "attrs gathering here" - } - - if (obj) lmv_put_obj(obj); + } RETURN(rc); } @@ -577,9 +579,8 @@ static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); - CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", - (unsigned long) fid->mds, (unsigned long) fid->id, - (unsigned long) fid->generation); + CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", (unsigned long)fid->mds, + (unsigned long)fid->id, (unsigned long)fid->generation); LASSERT(fid->mds < lmv->desc.ld_tgt_count); @@ -598,22 +599,25 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid, struct lmv_obj *obj; int rc = 0, mds; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); + LASSERT(pfid->mds < lmv->desc.ld_tgt_count); LASSERT(cfid->mds < lmv->desc.ld_tgt_count); + CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu:%*s -> %lu/%lu/%lu\n", - (unsigned long) pfid->mds, (unsigned long) pfid->id, - (unsigned long) pfid->generation, len, name, - (unsigned long) cfid->mds, (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)pfid->mds, (unsigned long)pfid->id, + (unsigned long)pfid->generation, len, name, + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); - /* this is default mds for directory name belongs to */ + /* this is default mds for directory name belongs to. */ mds = pfid->mds; obj = lmv_grab_obj(obd, pfid); if (obj) { - /* directory is splitted. look for right mds for this name */ + /* directory is splitted. look for right mds for this name. */ mds = raw_name2idx(obj->objcount, name, len); lmv_put_obj(obj); } @@ -660,6 +664,7 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ptlrpc_request *req = NULL; + struct lmv_obj *obj; struct lustre_md md; unsigned long valid; int mealen, rc; @@ -686,7 +691,10 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) if (md.mea == NULL) GOTO(cleanup, rc = -ENODATA); - rc = lmv_create_obj(exp, fid, md.mea); + obj = lmv_create_obj(exp, fid, md.mea); + if (IS_ERR(obj)) + rc = PTR_ERR(obj); + obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea); cleanup: @@ -722,27 +730,32 @@ repeat: } CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n", - op_data->namelen, op_data->name, - (unsigned long) op_data->fid1.mds, - (unsigned long) op_data->fid1.id, - (unsigned long) op_data->fid1.generation); + op_data->namelen, op_data->name, + (unsigned long)op_data->fid1.mds, + (unsigned long)op_data->fid1.id, + (unsigned long)op_data->fid1.generation); + rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data, datalen, mode, uid, gid, rdev, request); if (rc == 0) { + if (*request == NULL) - RETURN(rc); + RETURN(rc); + mds_body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*mds_body)); LASSERT(mds_body != NULL); + CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n", - (unsigned long) mds_body->fid1.id, - (unsigned long) mds_body->fid1.generation, + (unsigned long)mds_body->fid1.id, + (unsigned long)mds_body->fid1.generation, op_data->fid1.mds); + LASSERT(mds_body->valid & OBD_MD_MDS || mds_body->mds == op_data->fid1.mds); } else if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ + /* directory got splitted. time to update local object and + * repeat the request with proper MDS */ rc = lmv_get_mea_and_update_object(exp, &op_data->fid1); if (rc == 0) { ptlrpc_req_finished(*request); @@ -769,11 +782,10 @@ int lmv_done_writing(struct obd_export *exp, struct obdo *obdo) } int lmv_enqueue_slaves(struct obd_export *exp, int locktype, - struct lookup_intent *it, int lockmode, - struct mdc_op_data *data, struct lustre_handle *lockh, - void *lmm, int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, void *cb_data) + struct lookup_intent *it, int lockmode, + struct mdc_op_data *data, struct lustre_handle *lockh, + void *lmm, int lmmsize, ldlm_completion_callback cb_completion, + ldlm_blocking_callback cb_blocking, void *cb_data) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -826,8 +838,7 @@ cleanup: int lmv_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, int lock_mode, struct mdc_op_data *data, struct lustre_handle *lockh, - void *lmm, int lmmsize, - ldlm_completion_callback cb_completion, + void *lmm, int lmmsize, ldlm_completion_callback cb_completion, ldlm_blocking_callback cb_blocking, void *cb_data) { struct obd_device *obd = exp->exp_obd; @@ -850,17 +861,17 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, if (data->namelen) { obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - /* directory is splitted. look for - * right mds for this name */ + /* directory is splitted. look for right mds for this + * name */ mds = raw_name2idx(obj->objcount, (char *)data->name, data->namelen); data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); } } - CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", - LL_IT2STR(it), (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation); + CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", LL_IT2STR(it), + (unsigned long)data->fid1.id, (unsigned long)data->fid1.generation); + rc = md_enqueue(lmv->tgts[data->fid1.mds].ltd_exp, lock_type, it, lock_mode, data, lockh, lmm, lmmsize, cb_completion, cb_blocking, cb_data); @@ -869,8 +880,8 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, } int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, - char *filename, int namelen, unsigned long valid, - unsigned int ea_size, struct ptlrpc_request **request) + char *filename, int namelen, unsigned long valid, + unsigned int ea_size, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -890,17 +901,19 @@ repeat: rfid = obj->objs[mds].fid; lmv_put_obj(obj); } + CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu -> %lu/%lu/%lu\n", - namelen, filename, (unsigned long) fid->mds, - (unsigned long) fid->id, (unsigned long) fid->generation, - (unsigned long) rfid.mds, (unsigned long) rfid.id, - (unsigned long) rfid.generation); - rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, namelen, - valid, ea_size, request); + namelen, filename, (unsigned long)fid->mds, + (unsigned long)fid->id, (unsigned long)fid->generation, + (unsigned long)rfid.mds, (unsigned long)rfid.id, + (unsigned long)rfid.generation); + + rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, + namelen, valid, ea_size, request); if (rc == 0) { - /* this could be cross-node reference. in this case all - * we have right now is mds/ino/generation triple. we'd - * like to find other attributes */ + /* this could be cross-node reference. in this case all we have + * right now is mds/ino/generation triple. we'd like to find + * other attributes */ body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body)); LASSERT(body != NULL); if (body->valid & OBD_MD_MDS) { @@ -916,8 +929,8 @@ repeat: *request = req; } } else if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ + /* directory got splitted. time to update local object and + * repeat the request with proper MDS */ rc = lmv_get_mea_and_update_object(exp, &rfid); if (rc == 0) { ptlrpc_req_finished(*request); @@ -929,8 +942,8 @@ repeat: /* - * llite passes fid of an target inode in data->fid1 and - * fid of directory in data->fid2 + * llite passes fid of an target inode in data->fid1 and fid of directory in + * data->fid2 */ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct ptlrpc_request **request) @@ -940,9 +953,11 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct lmv_obj *obj; int rc; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); + if (data->namelen != 0) { /* usual link request */ obj = lmv_grab_obj(obd, &data->fid1); @@ -952,17 +967,22 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, data->fid1 = obj->objs[rc].fid; lmv_put_obj(obj); } - CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n", - (unsigned) data->fid2.mds, (unsigned) data->fid2.id, - (unsigned) data->fid2.generation, data->namelen, - data->name, (unsigned) data->fid1.mds, - (unsigned) data->fid1.id, - (unsigned) data->fid1.generation, data->fid1.mds); + + CDEBUG(D_OTHER,"link %lu/%lu/%lu:%*s to %lu/%lu/%lu mds %lu\n", + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, + data->namelen, data->name, + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, + (unsigned long)data->fid1.mds); } else { /* request from MDS to acquire i_links for inode by fid1 */ - CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n", - (unsigned) data->fid1.mds, (unsigned) data->fid1.id, - (unsigned) data->fid1.generation); + CDEBUG(D_OTHER, "inc i_nlinks for %lu/%lu/%lu\n", + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation); } rc = md_link(lmv->tgts[data->fid1.mds].ltd_exp, data, request); @@ -980,64 +1000,65 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, ENTRY; CDEBUG(D_OTHER, "rename %*s in %lu/%lu/%lu to %*s in %lu/%lu/%lu\n", - oldlen, old, (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, + oldlen, old, (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, newlen, new, (unsigned long) data->fid2.mds, (unsigned long) data->fid2.id, (unsigned long) data->fid2.generation); + if (!fid_equal(&data->fid1, &data->fid2)) CWARN("cross-node rename %lu/%lu/%lu:%*s to %lu/%lu/%lu:%*s\n", - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, oldlen, old, - (unsigned long) data->fid2.mds, - (unsigned long) data->fid2.id, - (unsigned long) data->fid2.generation, newlen, new); + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, oldlen, old, + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, newlen, new); rc = lmv_check_connect(obd); if (rc) RETURN(rc); if (oldlen == 0) { - /* MDS with old dir entry is asking another MDS - * to create name there */ + /* MDS with old dir entry is asking another MDS to create name + * there */ CDEBUG(D_OTHER, "create %*s(%d/%d) in %lu/%lu/%lu pointing to %lu/%lu/%lu\n", newlen, new, oldlen, newlen, - (unsigned long) data->fid2.mds, - (unsigned long) data->fid2.id, - (unsigned long) data->fid2.generation, - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation); + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation); mds = data->fid2.mds; goto request; } obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - /* directory is already splitted, so we have to forward - * request to the right MDS */ + /* directory is already splitted, so we have to forward request + * to the right MDS */ mds = raw_name2idx(obj->objcount, (char *)old, oldlen); data->fid1 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, - (unsigned long) obj->objs[mds].fid.mds, - (unsigned long) obj->objs[mds].fid.id, - (unsigned long) obj->objs[mds].fid.generation); + (unsigned long)obj->objs[mds].fid.mds, + (unsigned long)obj->objs[mds].fid.id, + (unsigned long)obj->objs[mds].fid.generation); lmv_put_obj(obj); } obj = lmv_grab_obj(obd, &data->fid2); if (obj) { - /* directory is already splitted, so we have to forward - * request to the right MDS */ + /* directory is already splitted, so we have to forward request + * to the right MDS */ mds = raw_name2idx(obj->objcount, (char *)new, newlen); data->fid2 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, - (unsigned long) obj->objs[mds].fid.mds, - (unsigned long) obj->objs[mds].fid.id, - (unsigned long) obj->objs[mds].fid.generation); + (unsigned long)obj->objs[mds].fid.mds, + (unsigned long)obj->objs[mds].fid.id, + (unsigned long)obj->objs[mds].fid.generation); lmv_put_obj(obj); } @@ -1066,20 +1087,27 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); obj = lmv_grab_obj(obd, &data->fid1); + CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n", - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, iattr->ia_valid, + (unsigned long)data->fid1.mds, (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, iattr->ia_valid, obj ? ", splitted" : ""); + if (obj) { for (i = 0; i < obj->objcount; i++) { data->fid1 = obj->objs[i].fid; - rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, - ealen, ea2, ea2len, &req); - LASSERT(rc == 0); + + rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, + ea, ealen, ea2, ea2len, &req); + if (rc) { + lmv_put_obj(obj); + ptlrpc_req_finished(req); + RETURN(rc); + } + if (fid_equal(&obj->fid, &obj->objs[i].fid)) { - /* this is master object and this request - * should be returned back to llite */ + /* this is master object and this request should + * be returned back to llite */ *request = req; } else { ptlrpc_req_finished(req); @@ -1087,12 +1115,12 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, } lmv_put_obj(obj); } else { - LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count); + LASSERT(i < lmv->desc.ld_tgt_count); rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen, ea2, ea2len, request); if (rc == 0) { mds_body = lustre_msg_buf((*request)->rq_repmsg, 0, - sizeof(*mds_body)); + sizeof(*mds_body)); LASSERT(mds_body != NULL); LASSERT(mds_body->mds == i); } @@ -1192,14 +1220,17 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, obj = lmv_grab_obj(obd, mdc_fid); if (obj) { - /* find dirobj containing page with requested offset */ - /* FIXME: what about protecting cached attrs here? */ + lmv_lock_obj(obj); + + /* find dirobj containing page with requested offset. */ for (i = 0; i < obj->objcount; i++) { if (offset < obj->objs[i].size) break; offset -= obj->objs[i].size; } rfid = obj->objs[i].fid; + + lmv_unlock_obj(obj); lmv_put_obj(obj); CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n", diff --git a/lustre/lmv/lmv_objmgr.c b/lustre/lmv/lmv_objmgr.c index 7ba5a7a..f79709d 100644 --- a/lustre/lmv/lmv_objmgr.c +++ b/lustre/lmv/lmv_objmgr.c @@ -193,22 +193,22 @@ __lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid) /* check if object is in progress of destroying. If so - skip * it. */ - down(&obj->guard); + lmv_lock_obj(obj); if (obj->freeing) { - up(&obj->guard); + lmv_unlock_obj(obj); continue; } /* check if this is waht we're looking for. */ if (fid_equal(&obj->fid, fid)) { __lmv_get_obj(obj); - up(&obj->guard); + lmv_unlock_obj(obj); return obj; } - - up(&obj->guard); + lmv_unlock_obj(obj); } + return NULL; } @@ -269,7 +269,7 @@ __lmv_create_obj(struct obd_device *obd, struct ll_fid *fid, /* creates object from passed @fid and @mea. If @mea is NULL, it will be * obtained from correct MDT and used for constructing the object. */ -int +struct lmv_obj * lmv_create_obj(struct obd_export *exp, struct ll_fid *fid, struct mea *mea) { @@ -278,7 +278,7 @@ lmv_create_obj(struct obd_export *exp, struct ptlrpc_request *req = NULL; struct lmv_obj *obj; struct lustre_md md; - int mealen, i, rc = 0; + int mealen, i, rc; ENTRY; CDEBUG(D_OTHER, "get mea for %lu/%lu/%lu and create lmv obj\n", @@ -300,17 +300,17 @@ lmv_create_obj(struct obd_export *exp, valid, mealen, &req); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); - GOTO(cleanup, rc); + GOTO(cleanup, obj = ERR_PTR(rc)); } rc = mdc_req2lustre_md(exp, req, 0, NULL, &md); if (rc) { CERROR("mdc_req2lustre_md() failed, error %d\n", rc); - GOTO(cleanup, rc); + GOTO(cleanup, obj = ERR_PTR(rc)); } if (!md.mea) - GOTO(cleanup, rc = -ENODATA); + GOTO(cleanup, obj = ERR_PTR(-ENODATA)); mea = md.mea; } @@ -321,14 +321,14 @@ lmv_create_obj(struct obd_export *exp, CERROR("Can't create new object %lu/%lu/%lu\n", (unsigned long)fid->mds, (unsigned long)fid->id, (unsigned long)fid->generation); - GOTO(cleanup, rc = -ENOMEM); + GOTO(cleanup, obj = ERR_PTR(-ENOMEM)); } lmv_put_obj(obj); cleanup: if (req) ptlrpc_req_finished(req); - RETURN(rc); + RETURN(obj); } /* looks for object with @fid and orders to destroy it. It possible the object @@ -340,25 +340,27 @@ lmv_destroy_obj(struct obd_export *exp, struct ll_fid *fid) { struct obd_device *obd = exp->exp_obd; struct lmv_obj *obj; - int rc = 0; ENTRY; spin_lock(&lmv_obj_list_lock); obj = __lmv_grab_obj(obd, fid); if (obj) { - down(&obj->guard); + + /* marking object as "freeing in progress" */ + lmv_lock_obj(obj); obj->freeing = 1; - up(&obj->guard); - + lmv_unlock_obj(obj); + __lmv_put_obj(obj); __lmv_put_obj(obj); - rc = 1; + spin_unlock(&lmv_obj_list_lock); + RETURN(1); } spin_unlock(&lmv_obj_list_lock); - RETURN(rc); + RETURN(0); } int -- 1.8.3.1