- fixed possible race in lmv_create_obj().
- cleanups in API of lmv object manager.
- removed some redundant checks for return code.
- added error handling for lmv_create_obj() in various places.
- coding style and indentation fixes in lmv code.
int rc, mds;
ENTRY;
- /* IT_OPEN is intended to open (and create, possible) an object.
- * parent (pfid) may be splitted dir */
+ /* IT_OPEN is intended to open (and create, possible) an object. Parent
+ * (pfid) may be splitted dir */
repeat:
mds = rpfid.mds;
- obj = lmv_grab_obj(obd, &rpfid, 0);
+ obj = lmv_grab_obj(obd, &rpfid);
if (obj) {
/* directory is already splitted, so we have to forward
* request to the right MDS */
mds = raw_name2idx(obj->objcount, (char *)name, len);
- rpfid = obj->objs[mds].fid;
CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
+
+ rpfid = obj->objs[mds].fid;
+ lmv_put_obj(obj);
}
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
len, lmm, lmmsize, cfid, it, flags, reqp,
cb_blocking);
- lmv_put_obj(obj);
if (rc == -ERESTART) {
/* directory got splitted. time to update local object
* and repeat the request with proper MDS */
if (rc != 0)
RETURN(rc);
- /* okay, MDS has returned success. probably name has been
- * resolved in remote inode */
- rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
- reqp, cb_blocking);
+ /* okay, MDS has returned success. Probably name has been resolved in
+ * remote inode */
+ rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it,
+ flags, reqp, cb_blocking);
if (rc != 0) {
LASSERT(rc < 0);
RETURN(rc);
}
- /* caller may use attrs MDS returns on IT_OPEN lock request
- * so, we have to update them for splitted dir */
+ /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
+ * to update them for splitted dir */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
+
cfid = &body->fid1;
- obj = lmv_grab_obj(obd, cfid, 0);
- if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+ obj = lmv_grab_obj(obd, cfid);
+ if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
/* wow! this is splitted dir, we'd like to handle it */
- rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
+ rc = lmv_create_obj(exp, &body->fid1, mea);
+ if (rc)
+ RETURN(rc);
+
+ obj = lmv_grab_obj(obd, cfid);
}
- obj = lmv_grab_obj(obd, cfid, 0);
+
if (obj) {
/* this is splitted dir and we'd want to get attrs */
CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation);
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation);
rc = lmv_revalidate_slaves(exp, reqp, cfid,
- it, 1, cb_blocking);
+ it, 1, cb_blocking);
} else if (S_ISDIR(body->mode)) {
/*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
(unsigned long) cfid->mds,
(unsigned long) cfid->id,
(unsigned long) cfid->generation);*/
}
- lmv_put_obj(obj);
+
+ if (obj)
+ lmv_put_obj(obj);
+
RETURN(rc);
}
ENTRY;
if (cfid) {
- /* caller wants to revalidate attrs of obj
- * we have to revalidate slaves if requested
- * object is splitted directory */
+ /* caller wants to revalidate attrs of obj we have to revalidate
+ * slaves if requested object is splitted directory */
CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation);
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation);
mds = cfid->mds;
- obj = lmv_grab_obj(obd, cfid, 0);
+ obj = lmv_grab_obj(obd, cfid);
if (obj) {
- /* in fact, we need not this with current
- * _intent_lock(), but it may change some day */
+ /* in fact, we need not this with current intent_lock(),
+ * but it may change some day */
rpfid = obj->objs[mds].fid;
+ lmv_put_obj(obj);
}
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
len, lmm, lmmsize, cfid, it, flags, reqp,
cb_blocking);
if (obj && rc >= 0) {
- /* this is splitted dir. in order to optimize things
- * a bit, we consider obj valid updating missing
- * parts. FIXME: do we need to return any lock here?
- * it would be fine if we don't. this means that
- * nobody should use UPDATE lock to notify about
- * object removal */
+ /* this is splitted dir. In order to optimize things a
+ * bit, we consider obj valid updating missing parts.
+
+ * FIXME: do we need to return any lock here? It would
+ * be fine if we don't. this means that nobody should
+ * use UPDATE lock to notify about object * removal */
CDEBUG(D_OTHER,
"revalidate slaves for %lu/%lu/%lu, rc %d\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation, rc);
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation, rc);
+
rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
cb_blocking);
}
+
RETURN(rc);
}
CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
- len, name, (unsigned long) pfid->mds,
- (unsigned long) pfid->id,
- (unsigned long) pfid->generation);
+ len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id,
+ (unsigned long)pfid->generation);
mds = pfid->mds;
- obj = lmv_grab_obj(obd, pfid, 0);
+ obj = lmv_grab_obj(obd, pfid);
if (obj && len) {
/* directory is already splitted. calculate mds */
mds = raw_name2idx(obj->objcount, (char *) name, len);
rpfid = obj->objs[mds].fid;
+ lmv_put_obj(obj);
+
CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
- mds, (unsigned long) rpfid.mds,
- (unsigned long) rpfid.id,
- (unsigned long) rpfid.generation);
+ mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+ (unsigned long)rpfid.generation);
}
+
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
len, lmm, lmmsize, NULL, it, flags, reqp,
cb_blocking);
+
if (rc < 0)
RETURN(rc);
+
LASSERT(rc == 0);
/* okay, MDS has returned success. probably name has been
* resolved in remote inode */
- rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
- reqp, cb_blocking);
+ rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it,
+ flags, reqp, cb_blocking);
if (rc < 0)
RETURN(rc);
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
+
cfid = &body->fid1;
- obj2 = lmv_grab_obj(obd, cfid, 0);
+ obj2 = lmv_grab_obj(obd, cfid);
- if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
- /* wow! this is splitted dir, we'd like to handle it */
+ if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+ /* wow! this is splitted dir, we'd like to handle it. */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
- obj2 = lmv_grab_obj(obd, cfid, 0);
+
+ rc = lmv_create_obj(exp, &body->fid1, mea);
+ if (rc)
+ RETURN(rc);
+
+ obj2 = lmv_grab_obj(obd, cfid);
}
if (obj2) {
/* this is splitted dir and we'd want to get attrs */
- CDEBUG(D_OTHER,
- "attrs from slaves for %lu/%lu/%lu, rc %d\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation, rc);
- rc = lmv_revalidate_slaves(exp, reqp, cfid,
- it, 1, cb_blocking);
+ CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n",
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation, rc);
+
+ rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking);
+ lmv_put_obj(obj2);
}
RETURN(rc);
}
LASSERT(reqp);
LASSERT(*reqp);
- /* master is locked. we'd like to take locks on slaves
- * and update attributes to be returned from the slaves
- * it's important that lookup is called in two cases:
- * - for first time (dcache has no such a resolving yet
- * - ->d_revalidate() returned false
- * last case possible only if all the objs (master and
- * all slaves aren't valid */
+ /* master is locked. we'd like to take locks on slaves and update
+ * attributes to be returned from the slaves it's important that lookup
+ * is called in two cases:
+
+ * - for first time (dcache has no such a resolving yet.
+ * - ->d_revalidate() returned false.
+
+ * last case possible only if all the objs (master and all slaves aren't
+ * valid */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- obj = lmv_grab_obj(obd, &body->fid1, 0);
+ obj = lmv_grab_obj(obd, &body->fid1);
LASSERT(obj);
CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
- (unsigned long) body->fid1.mds,
- (unsigned long) body->fid1.id,
- (unsigned long) body->fid1.generation);
+ (unsigned long)body->fid1.mds,
+ (unsigned long)body->fid1.id,
+ (unsigned long)body->fid1.generation);
uctxt.gid1 = 0;
uctxt.gid2 = 0;
+
for (i = 0; i < obj->objcount; i++) {
struct ll_fid fid = obj->objs[i].fid;
struct ptlrpc_request *req = NULL;
struct lookup_intent it;
- if (fid_equal(&fid, &obj->fid)) {
+ if (fid_equal(&fid, &obj->fid))
/* skip master obj */
continue;
- }
CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
- (unsigned long) fid.mds,
- (unsigned long) fid.id,
- (unsigned long) fid.generation);
+ (unsigned long)fid.mds, (unsigned long)fid.id,
+ (unsigned long)fid.generation);
/* is obj valid? */
memset(&it, 0, sizeof(it));
goto release_lock;
}
- if (rc < 0) {
+ if (rc < 0)
/* error during revalidation */
GOTO(cleanup, rc);
- }
- /* rc == 0, this means we have no such a lock and can't
- * think obj is still valid. lookup it again */
+ /* rc == 0, this means we have no such a lock and can't think
+ * obj is still valid. lookup it again */
LASSERT(req == NULL);
req = NULL;
memset(&it, 0, sizeof(it));
rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
NULL, 0, NULL, 0, NULL, &it, 0, &req,
lmv_dirobj_blocking_ast);
+
lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
LASSERT(rc <= 0);
- if (rc < 0) {
+
+ if (rc < 0)
/* error during lookup */
GOTO(cleanup, rc);
- }
lock = ldlm_handle2lock(lockh);
LASSERT(lock);
- lock->l_ast_data = obj;
- atomic_inc(&obj->count);
+
+ lock->l_ast_data = lmv_get_obj(obj);
body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
LASSERT(body2);
obj->objs[i].size = body2->size;
CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long) obj->objs[i].size);
+ (unsigned long) obj->objs[i].size);
LDLM_LOCK_PUT(lock);
ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
}
cleanup:
+ if (obj)
+ lmv_put_obj(obj);
RETURN(rc);
}
int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
- struct ll_fid *pfid, const char *name, int len,
- void *lmm, int lmmsize, struct ll_fid *cfid,
- struct lookup_intent *it, int flags,
- struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking)
+ struct ll_fid *pfid, const char *name, int len,
+ void *lmm, int lmmsize, struct ll_fid *cfid,
+ struct lookup_intent *it, int flags,
+ struct ptlrpc_request **reqp,
+ ldlm_blocking_callback cb_blocking)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
int rc, mds;
ENTRY;
- /* IT_LOOKUP is intended to produce name -> fid resolving
- * (let's call this lookup below) or to confirm requested
- * resolving is still valid (let's call this revalidation)
- * cfid != NULL specifies revalidation */
+ /* IT_LOOKUP is intended to produce name -> fid resolving (let's call
+ * this lookup below) or to confirm requested resolving is still valid
+ * (let's call this revalidation) cfid != NULL specifies revalidation */
if (cfid) {
- /* this is revalidation: we have to check is LOOKUP
- * lock still valid for given fid. very important
- * part is that we have to choose right mds because
- * namespace is per mds */
+ /* this is revalidation: we have to check is LOOKUP lock still
+ * valid for given fid. very important part is that we have to
+ * choose right mds because namespace is per mds */
rpfid = *pfid;
- obj = lmv_grab_obj(obd, pfid, 0);
+ obj = lmv_grab_obj(obd, pfid);
if (obj) {
mds = raw_name2idx(obj->objcount, (char *) name, len);
rpfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
mds = rpfid.mds;
+
CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation, mds);
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation, mds);
+
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name,
len, lmm, lmmsize, cfid, it, flags,
reqp, cb_blocking);
mds = pfid->mds;
repeat:
- /* this is lookup. during lookup we have to update all the
- * attributes, because returned values will be put in struct
- * inode */
+ /* this is lookup. during lookup we have to update all the attributes,
+ * because returned values will be put in struct inode */
- obj = lmv_grab_obj(obd, pfid, 0);
+ obj = lmv_grab_obj(obd, pfid);
if (obj && len) {
/* directory is already splitted. calculate mds */
- mds = raw_name2idx(obj->objcount, (char *) name, len);
+ mds = raw_name2idx(obj->objcount, (char *)name, len);
rpfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
len, lmm, lmmsize, NULL, it, flags, reqp,
cb_blocking);
if (rc > 0) {
- /* very interesting. it seems object is still valid
- * but for some reason llite calls lookup, not revalidate */
+ /* very interesting. it seems object is still valid but for some
+ * reason llite calls lookup, not revalidate */
CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
- (unsigned long) rpfid.mds,
- (unsigned long) rpfid.id,
- (unsigned long) rpfid.generation);
+ (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+ (unsigned long)rpfid.generation);
+
LASSERT(*reqp == NULL);
RETURN(rc);
}
if (rc == 0 && *reqp == NULL) {
/* once again, we're asked for lookup, not revalidate */
CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
- (unsigned long) rpfid.mds,
- (unsigned long) rpfid.id,
- (unsigned long) rpfid.generation);
+ (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+ (unsigned long)rpfid.generation);
RETURN(rc);
}
* had to fail and lookup on dir had to return mea */
CWARN("we haven't knew about directory splitting!\n");
LASSERT(obj == NULL);
- rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
+ rc = lmv_create_obj(exp, &rpfid, NULL);
if (rc)
RETURN(rc);
goto repeat;
/* wow! this is splitted dir, we'd like to handle it */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- obj = lmv_grab_obj(obd, &body->fid1, 0);
- if (!obj)
- rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
- lmv_put_obj(obj);
+
+ if ((obj = lmv_grab_obj(obd, &body->fid1)))
+ lmv_put_obj(obj);
+ else
+ rc = lmv_create_obj(exp, &body->fid1, mea);
}
RETURN(rc);
}
int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
- struct ll_fid *pfid, const char *name, int len,
- void *lmm, int lmmsize, struct ll_fid *cfid,
- struct lookup_intent *it, int flags,
- struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking)
+ struct ll_fid *pfid, const char *name, int len,
+ void *lmm, int lmmsize, struct ll_fid *cfid,
+ struct lookup_intent *it, int flags,
+ struct ptlrpc_request **reqp,
+ ldlm_blocking_callback cb_blocking)
{
struct obd_device *obd = exp->exp_obd;
int rc = 0;
int i, rc = 0;
ENTRY;
- /* we have to loop over the subobjects, check validity and update
- * them from MDSs if needed. it's very useful that we need not to
- * update all the fields. say, common fields (that are equal on
- * all the subojects need not to be update, another fields (i_size,
- * for example) are cached all the time */
- obj = lmv_grab_obj(obd, mfid, 0);
+ /* we have to loop over the subobjects, check validity and update them
+ * from MDSs if needed. it's very useful that we need not to update all
+ * the fields. say, common fields (that are equal on all the subojects
+ * need not to be update, another fields (i_size, for example) are
+ * cached all the time */
+ obj = lmv_grab_obj(obd, mfid);
LASSERT(obj);
- master_lock_mode = 0;
uctxt.gid1 = 0;
uctxt.gid2 = 0;
+ master_lock_mode = 0;
+
for (i = 0; i < obj->objcount; i++) {
struct ll_fid fid = obj->objs[i].fid;
struct lustre_handle *lockh = NULL;
int master = 0;
CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
- (unsigned long) fid.mds,
- (unsigned long) fid.id,
+ (unsigned long)fid.mds, (unsigned long)fid.id,
(unsigned long) fid.generation);
memset(&it, 0, sizeof(it));
/* lmv_intent_getattr() already checked
* validness and took the lock */
if (mreq) {
- /* it even got the reply
- * refresh attrs from that reply */
+ /* it even got the reply refresh attrs
+ * from that reply */
body = lustre_msg_buf(mreq->rq_repmsg,
1,sizeof(*body));
LASSERT(body != NULL);
goto release_lock;
}
- if (rc < 0) {
+ if (rc < 0)
/* error during revalidation */
GOTO(cleanup, rc);
- }
- /* rc == 0, this means we have no such a lock and can't
- * think obj is still valid. lookup it again */
+ /* rc == 0, this means we have no such a lock and can't think
+ * obj is still valid. lookup it again */
LASSERT(req == NULL);
req = NULL;
memset(&it, 0, sizeof(it));
NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
LASSERT(rc <= 0);
- if (rc < 0) {
+ if (rc < 0)
/* error during lookup */
GOTO(cleanup, rc);
- }
if (master) {
LASSERT(master_valid == 0);
/* this is slave. we want to control it */
lock = ldlm_handle2lock(lockh);
LASSERT(lock);
- lock->l_ast_data = obj;
- atomic_inc(&obj->count);
+ lock->l_ast_data = lmv_get_obj(obj);
LDLM_LOCK_PUT(lock);
}
(unsigned long) size);
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body);
+
/* FIXME: what about another attributes? */
body->size = size;
if (mreq == NULL) {
- /* very important to maintain lli->mds the same
- * because of revalidation. mreq == NULL means
- * that caller has no reply and the only attr
- * we can return is size */
+ /* very important to maintain lli->mds the same because
+ * of revalidation. mreq == NULL means that caller has
+ * no reply and the only attr we can return is size */
body->valid = OBD_MD_FLSIZE;
body->mds = obj->fid.mds;
}
rc = 1;
}
cleanup:
+ if (obj)
+ lmv_put_obj(obj);
RETURN(rc);
}
-
};
int lmv_dirobj_blocking_ast(struct ldlm_lock *,
- struct ldlm_lock_desc *, void *, int);
-struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
- struct ll_fid *fid, int create);
+ struct ldlm_lock_desc *,
+ void *, int);
+
void lmv_put_obj(struct lmv_obj *obj);
+struct lmv_obj *lmv_get_obj(struct lmv_obj *obj);
+
+int lmv_setup_mgr(struct obd_device *obd);
+void lmv_cleanup_mgr(struct obd_device *obd);
+
+struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
+ struct ll_fid *fid);
+
+int lmv_create_obj(struct obd_export *exp, struct ll_fid *fid,
+ struct mea *mea);
int lmv_intent_lock(struct obd_export *, struct ll_uctxt *,
struct ll_fid *, const char *, int, void *, int,
struct ll_fid *, const char *, int, void *, int,
struct ll_fid *, struct lookup_intent *, int,
struct ptlrpc_request **, ldlm_blocking_callback);
-int lmv_create_obj_from_attrs(struct obd_export *, struct ll_fid *,
- struct mea *);
+
int lmv_check_connect(struct obd_device *obd);
int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
struct ll_fid *, struct lookup_intent *, int,
ldlm_blocking_callback cb_blocking);
-void lmv_cleanup_objs(struct obd_device *obd);
+
int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *);
static inline struct mea *
static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
{
- struct lustre_cfg *lcfg = buf;
+ int i, rc = 0;
struct lmv_desc *desc;
- struct lmv_obd *lmv = &obd->u.lmv;
struct obd_uuid *uuids;
struct lmv_tgt_desc *tgts;
- int i;
- int rc = 0;
+ struct lustre_cfg *lcfg = buf;
+ struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
if (lcfg->lcfg_inllen1 < 1) {
OBD_ALLOC(lmv->tgts, lmv->bufsize);
if (lmv->tgts == NULL) {
CERROR("Out of memory\n");
- RETURN(-EINVAL);
+ RETURN(-ENOMEM);
}
lmv->desc = *desc;
for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
tgts->uuid = uuids[i];
+ lmv->max_cookiesize = 0;
+
lmv->max_easize = sizeof(struct ll_fid) *
desc->ld_tgt_count + sizeof(struct mea);
- lmv->max_cookiesize = 0;
+ rc = lmv_setup_mgr(obd);
+ if (rc) {
+ CERROR("Can't setup LMV object manager, "
+ "error %d.\n", rc);
+ OBD_FREE(lmv->tgts, lmv->bufsize);
+ }
RETURN(rc);
}
{
struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
- lmv_cleanup_objs(obd);
+ lmv_cleanup_mgr(obd);
OBD_FREE(lmv->tgts, lmv->bufsize);
RETURN(0);
}
}
static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid,
- unsigned long valid, unsigned int ea_size,
- struct ptlrpc_request **request)
+ unsigned long valid, unsigned int ea_size,
+ struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
int rc, i = fid->mds;
struct lmv_obj *obj;
ENTRY;
+
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- obj = lmv_grab_obj(obd, fid, 0);
+
+ obj = lmv_grab_obj(obd, fid);
+
CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n",
- (unsigned long) fid->mds,
- (unsigned long) fid->id,
- (unsigned long) fid->generation,
- obj ? "(splitted)" : "");
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation, obj ? "(splitted)" : "");
LASSERT(fid->mds < lmv->desc.ld_tgt_count);
rc = md_getattr(lmv->tgts[i].ltd_exp, fid,
- valid, ea_size, request);
+ valid, ea_size, request);
if (rc == 0 && obj) {
- /* we have to loop over dirobjs here and gather attrs
- * for all the slaves */
+ /* we have to loop over dirobjs here and gather attrs for all
+ * the slaves. */
#warning "attrs gathering here"
}
- lmv_put_obj(obj);
+
+ if (obj)
+ lmv_put_obj(obj);
+
RETURN(rc);
}
-static int lmv_change_cbdata(struct obd_export *exp,
- struct ll_fid *fid,
- ldlm_iterator_t it, void *data)
+static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
+ ldlm_iterator_t it, void *data)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
+
CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n",
- (unsigned long) fid->mds,
- (unsigned long) fid->id,
+ (unsigned long) fid->mds, (unsigned long) fid->id,
(unsigned long) fid->generation);
+
LASSERT(fid->mds < lmv->desc.ld_tgt_count);
- rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, fid, it, data);
+
+ rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp,
+ fid, it, data);
+
RETURN(rc);
}
/* this is default mds for directory name belongs to */
mds = pfid->mds;
- obj = lmv_grab_obj(obd, pfid, 0);
+ obj = lmv_grab_obj(obd, pfid);
if (obj) {
/* directory is splitted. look for right mds for this name */
mds = raw_name2idx(obj->objcount, name, len);
if (md.mea == NULL)
GOTO(cleanup, rc = -ENODATA);
- rc = lmv_create_obj_from_attrs(exp, fid, md.mea);
- obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea);
+ rc = lmv_create_obj(exp, fid, md.mea);
+ obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
cleanup:
if (req)
if (!lmv->desc.ld_active_tgt_count)
RETURN(-EIO);
repeat:
- obj = lmv_grab_obj(obd, &op_data->fid1, 0);
+ obj = lmv_grab_obj(obd, &op_data->fid1);
if (obj) {
mds = raw_name2idx(obj->objcount, op_data->name,
op_data->namelen);
memset(&data2, 0, sizeof(data2));
data2.fid1 = mea->mea_fids[i];
mds = data2.fid1.mds;
+
rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
&data2, lockh + i, lmm, lmmsize, cb_completion,
cb_blocking, cb_data);
+
CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n",
- (unsigned long) mea->mea_fids[i].mds,
- (unsigned long) mea->mea_fids[i].id,
- (unsigned long) mea->mea_fids[i].generation,
+ (unsigned long)mea->mea_fids[i].mds,
+ (unsigned long)mea->mea_fids[i].id,
+ (unsigned long)mea->mea_fids[i].generation,
rc, it->d.lustre.it_status);
if (rc)
GOTO(cleanup, rc);
}
if (data->namelen) {
- obj = lmv_grab_obj(obd, &data->fid1, 0);
+ obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
/* directory is splitted. look for
* right mds for this name */
if (rc)
RETURN(rc);
repeat:
- obj = lmv_grab_obj(obd, fid, 0);
+ obj = lmv_grab_obj(obd, fid);
if (obj) {
/* directory is splitted. look for right mds for this name */
mds = raw_name2idx(obj->objcount, filename, namelen - 1);
RETURN(rc);
if (data->namelen != 0) {
/* usual link request */
- obj = lmv_grab_obj(obd, &data->fid1, 0);
+ obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
rc = raw_name2idx(obj->objcount, data->name,
data->namelen);
goto request;
}
- obj = lmv_grab_obj(obd, &data->fid1, 0);
+ obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
/* directory is already splitted, so we have to forward
* request to the right MDS */
(unsigned long) obj->objs[mds].fid.mds,
(unsigned long) obj->objs[mds].fid.id,
(unsigned long) obj->objs[mds].fid.generation);
+ lmv_put_obj(obj);
}
- lmv_put_obj(obj);
- obj = lmv_grab_obj(obd, &data->fid2, 0);
+ obj = lmv_grab_obj(obd, &data->fid2);
if (obj) {
/* directory is already splitted, so we have to forward
* request to the right MDS */
(unsigned long) obj->objs[mds].fid.mds,
(unsigned long) obj->objs[mds].fid.id,
(unsigned long) obj->objs[mds].fid.generation);
+ lmv_put_obj(obj);
}
- lmv_put_obj(obj);
mds = data->fid1.mds;
}
int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
- struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
- struct ptlrpc_request **request)
+ struct iattr *iattr, void *ea, int ealen, void *ea2,
+ int ea2len, struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
if (rc)
RETURN(rc);
- obj = lmv_grab_obj(obd, &data->fid1, 0);
+ obj = lmv_grab_obj(obd, &data->fid1);
CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n",
(unsigned long) data->fid1.mds,
(unsigned long) data->fid1.id,
RETURN(rc);
}
-int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc, void *data, int flag)
+int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
{
struct lustre_handle lockh;
struct lmv_obj *obj;
case LDLM_CB_CANCELING:
/* time to drop cached attrs for dirobj */
obj = lock->l_ast_data;
- if (!obj)
- break;
-
- CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n",
- lock->l_resource->lr_name.name[3] == 1 ?
- "LOOKUP" : "UPDATE",
- (unsigned long) lock->l_resource->lr_name.name[0],
- (unsigned long) lock->l_resource->lr_name.name[1],
- (unsigned long) obj->fid.mds,
- (unsigned long) obj->fid.id,
- (unsigned long) obj->fid.generation);
+ if (obj) {
+ CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n",
+ lock->l_resource->lr_name.name[3] == 1 ? "LOOKUP" : "UPDATE",
+ (unsigned long)lock->l_resource->lr_name.name[0],
+ (unsigned long)lock->l_resource->lr_name.name[1],
+ (unsigned long)obj->fid.mds, (unsigned long)obj->fid.id,
+ (unsigned long)obj->fid.generation);
+ lmv_put_obj(obj);
+ }
break;
default:
LBUG();
(unsigned long) rfid.id,
(unsigned long) rfid.generation);
- obj = lmv_grab_obj(obd, mdc_fid, 0);
+ obj = lmv_grab_obj(obd, mdc_fid);
if (obj) {
/* find dirobj containing page with requested offset */
/* FIXME: what about protecting cached attrs here? */
offset -= obj->objs[i].size;
}
rfid = obj->objs[i].fid;
+ lmv_put_obj(obj);
+
CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n",
- (unsigned long) rfid.mds,
- (unsigned long) rfid.id,
- (unsigned long) rfid.generation,
- (unsigned long) offset);
+ (unsigned long)rfid.mds, (unsigned long)rfid.id,
+ (unsigned long)rfid.generation, (unsigned long)offset);
}
- rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, page, request);
- if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
- /* this page isn't from master object. to avoid
- * ./.. duplication in directory, we have to remove them
- * from all slave objects */
+ rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset,
+ page, request);
+
+ if (rc == 0 && !fid_equal(&rfid, mdc_fid))
+ /* this page isn't from master object. To avoid "." and ".."
+ * duplication in directory, we have to remove them from all
+ * slave objects */
lmv_remove_dots(page);
- }
-
- lmv_put_obj(obj);
-
+
RETURN(rc);
}
RETURN(rc);
} else if (data->namelen != 0) {
struct lmv_obj *obj;
- obj = lmv_grab_obj(obd, &data->fid1, 0);
+
+ obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
i = raw_name2idx(obj->objcount, data->name,
data->namelen);
}
int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
- struct lov_mds_md *disk_src, int mdsize)
+ struct lov_mds_md *disk_src, int mdsize)
{
struct obd_device *obd = class_exp2obd(exp);
struct lmv_obd *lmv = &obd->u.lmv;
}
int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *ea, obd_count oa_bufs,
- struct brw_page *pgarr, struct obd_trans_info *oti)
+ struct lov_stripe_md *ea, obd_count oa_bufs,
+ struct brw_page *pgarr, struct obd_trans_info *oti)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
#include <linux/obd_lmv.h>
#include "lmv_internal.h"
-
LIST_HEAD(lmv_obj_list);
spinlock_t lmv_obj_list_lock = SPIN_LOCK_UNLOCKED;
-struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
- struct ll_fid *fid, int create)
+/* creates new obj on passed @fid and @mea. */
+static struct lmv_obj *
+__lmv_alloc_obj(struct obd_device *obd, struct ll_fid *fid,
+ struct mea *mea)
{
+ int i;
+ struct lmv_obj *obj;
+ unsigned int obj_size;
struct lmv_obd *lmv = &obd->u.lmv;
- struct list_head *cur;
- struct lmv_obj *obj, *obj2;
-
- spin_lock(&lmv_obj_list_lock);
- list_for_each(cur, &lmv_obj_list) {
- obj = list_entry(cur, struct lmv_obj, list);
- if (obj->fid.mds == fid->mds && obj->fid.id == fid->id &&
- obj->fid.generation == fid->generation) {
- atomic_inc(&obj->count);
- spin_unlock(&lmv_obj_list_lock);
- RETURN(obj);
- }
- }
- spin_unlock(&lmv_obj_list_lock);
- if (!create)
- RETURN(NULL);
-
- /* no such object yet, allocate and initialize them */
OBD_ALLOC(obj, sizeof(*obj));
if (!obj)
- RETURN(NULL);
- atomic_set(&obj->count, 0);
- obj->fid = *fid;
+ return NULL;
+
obj->obd = obd;
+ obj->fid = *fid;
+
+ atomic_set(&obj->count, 0);
+ obj->objcount = mea->mea_count;
- OBD_ALLOC(obj->objs, sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
- if (!obj->objs) {
- OBD_FREE(obj, sizeof(*obj));
- RETURN(NULL);
- }
- memset(obj->objs, 0, sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
+ obj_size = sizeof(struct lmv_inode) *
+ lmv->desc.ld_tgt_count;
+
+ OBD_ALLOC(obj->objs, obj_size);
+ if (!obj->objs)
+ goto err_obj;
- spin_lock(&lmv_obj_list_lock);
- list_for_each(cur, &lmv_obj_list) {
- obj2 = list_entry(cur, struct lmv_obj, list);
- if (obj2->fid.mds == fid->mds && obj2->fid.id == fid->id &&
- obj2->fid.generation == fid->generation) {
- /* someone created it already */
- OBD_FREE(obj->objs,
- sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
- OBD_FREE(obj, sizeof(*obj));
-
- atomic_inc(&obj2->count);
- spin_unlock(&lmv_obj_list_lock);
- RETURN(obj2);
- }
+ memset(obj->objs, 0, obj_size);
+
+ /* put all fids in */
+ for (i = 0; i < mea->mea_count; i++) {
+ CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n",
+ (unsigned long)mea->mea_fids[i].mds,
+ (unsigned long)mea->mea_fids[i].id,
+ (unsigned long)mea->mea_fids[i].generation);
+ obj->objs[i].fid = mea->mea_fids[i];
}
- list_add(&obj->list, &lmv_obj_list);
- CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n",
- (unsigned long) fid->mds, (unsigned long) fid->id,
- (unsigned long) fid->generation);
- spin_unlock(&lmv_obj_list_lock);
- RETURN(obj);
+ return obj;
+err_obj:
+ OBD_FREE(obj, sizeof(*obj));
+ return NULL;
}
-void lmv_put_obj(struct lmv_obj *obj)
+/* destroys passed @obj. */
+static void
+__lmv_free_obj(struct lmv_obj *obj)
{
- if (!obj)
- return;
+ unsigned int obj_size;
+ struct lmv_obd *lmv = &obj->obd->u.lmv;
+
+ obj_size = sizeof(struct lmv_inode) *
+ lmv->desc.ld_tgt_count;
+
+ OBD_FREE(obj->objs, obj_size);
+ OBD_FREE(obj, sizeof(*obj));
+}
+
+struct lmv_obj *
+lmv_get_obj(struct lmv_obj *obj)
+{
+ LASSERT(obj);
+ atomic_inc(&obj->count);
+ return obj;
+}
+
+void
+lmv_put_obj(struct lmv_obj *obj)
+{
+ LASSERT(obj);
+
if (atomic_dec_and_test(&obj->count)) {
- CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu\n",
- (unsigned long) obj->fid.mds,
- (unsigned long) obj->fid.id,
- (unsigned long) obj->fid.generation);
+ struct ll_fid *fid = &obj->fid;
+ CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu - destroying\n",
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation);
+ __lmv_free_obj(obj);
}
}
-void lmv_cleanup_objs(struct obd_device *obd)
+static struct lmv_obj *
+__lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct list_head *cur, *tmp;
struct lmv_obj *obj;
+ struct list_head *cur;
- spin_lock(&lmv_obj_list_lock);
- list_for_each_safe(cur, tmp, &lmv_obj_list) {
+ list_for_each(cur, &lmv_obj_list) {
obj = list_entry(cur, struct lmv_obj, list);
- if (obj->obd != obd)
- continue;
+ if (fid_equal(&obj->fid, fid))
+ return lmv_get_obj(obj);
+ }
+ return NULL;
+}
- list_del(&obj->list);
- OBD_FREE(obj->objs,
- sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
- OBD_FREE(obj, sizeof(*obj));
+struct lmv_obj *
+lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid)
+{
+ struct lmv_obj *obj;
+ ENTRY;
+
+ spin_lock(&lmv_obj_list_lock);
+ obj = __lmv_grab_obj(obd, fid);
+ spin_unlock(&lmv_obj_list_lock);
+
+ RETURN(obj);
+}
+
+/* looks in objects list for an object that matches passed @fid. If it is not
+ * found -- creates it using passed @mea and puts to list. */
+static struct lmv_obj *
+__lmv_create_obj(struct obd_device *obd, struct ll_fid *fid,
+ struct mea *mea)
+{
+ struct lmv_obj *obj, *cobj;
+ ENTRY;
+
+ obj = lmv_grab_obj(obd, fid);
+ if (obj)
+ RETURN(obj);
+
+ /* no such object yet, allocate and initialize them. */
+ obj = __lmv_alloc_obj(obd, fid, mea);
+ if (!obj)
+ RETURN(NULL);
+
+ /* check if someone create it already while we were dealing with
+ * allocating @obj. */
+ spin_lock(&lmv_obj_list_lock);
+ cobj = __lmv_grab_obj(obd, fid);
+ if (cobj) {
+ /* someone created it already - put @obj and getting out. */
+ __lmv_free_obj(obj);
+ spin_unlock(&lmv_obj_list_lock);
+ RETURN(cobj);
}
+
+ /* object is referenced by list and thus should have additional
+ * reference counted. */
+ lmv_get_obj(obj);
+ list_add(&obj->list, &lmv_obj_list);
spin_unlock(&lmv_obj_list_lock);
+
+ CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n",
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation);
+
+ RETURN(lmv_get_obj(obj));
+
}
-int lmv_create_obj_from_attrs(struct obd_export *exp,
- struct ll_fid *fid, struct mea *mea)
+int
+lmv_create_obj(struct obd_export *exp,
+ struct ll_fid *fid, struct mea *mea)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
CDEBUG(D_OTHER, "get mea for %lu/%lu/%lu and create lmv obj\n",
- (unsigned long) fid->mds, (unsigned long) fid->id,
- (unsigned long) fid->generation);
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation);
if (!mea) {
unsigned long valid;
rc = md_getattr(lmv->tgts[fid->mds].ltd_exp, fid,
valid, mealen, &req);
if (rc) {
- CERROR("md_getattr() failed, rc = %d\n", rc);
+ CERROR("md_getattr() failed, error %d\n", rc);
GOTO(cleanup, rc);
}
rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
if (rc) {
- CERROR("mdc_req2lustre_md() failed, rc = %d\n", rc);
+ CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
GOTO(cleanup, rc);
}
- if (md.mea == NULL)
+ if (!md.mea)
GOTO(cleanup, rc = -ENODATA);
mea = md.mea;
}
- /* got mea, now create obj for it */
- obj = lmv_grab_obj(obd, fid, 1);
- if (!obj)
+ /* got mea, now create obj for it. */
+ obj = __lmv_create_obj(obd, fid, mea);
+ if (!obj) {
+ CERROR("Can't create new object %lu/%lu/%lu\n",
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation);
GOTO(cleanup, rc = -ENOMEM);
-
- obj->objcount = mea->mea_count;
- /* put all fids in */
- for (i = 0; i < mea->mea_count; i++) {
- CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n",
- (unsigned long) mea->mea_fids[i].mds,
- (unsigned long) mea->mea_fids[i].id,
- (unsigned long) mea->mea_fids[i].generation);
- obj->objs[i].fid = mea->mea_fids[i];
- }
-
+ } else
+ lmv_put_obj(obj);
cleanup:
if (req)
ptlrpc_req_finished(req);
RETURN(rc);
}
+int
+lmv_setup_mgr(struct obd_device *obd)
+{
+ CWARN("LMV object manager setup\n");
+ return 0;
+}
+
+void
+lmv_cleanup_mgr(struct obd_device *obd)
+{
+ struct lmv_obj *obj;
+ struct list_head *cur, *tmp;
+ CWARN("LMV object manager cleanup\n");
+
+ spin_lock(&lmv_obj_list_lock);
+ list_for_each_safe(cur, tmp, &lmv_obj_list) {
+ obj = list_entry(cur, struct lmv_obj, list);
+
+ if (obj->obd != obd)
+ continue;
+
+ list_del(&obj->list);
+
+ if (atomic_read(&obj->count) > 1) {
+ struct ll_fid *fid = &obj->fid;
+
+ CERROR("Object %lu/%lu/%lu has invalid ref count %d\n",
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation,
+ atomic_read(&obj->count));
+ }
+
+ /* list does not use object anymore, ref counter should be
+ * descreased. */
+ lmv_put_obj(obj);
+ }
+ spin_unlock(&lmv_obj_list_lock);
+}