cfid = &body->fid1;
obj = lmv_grab_obj(obd, cfid);
- if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+ if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) {
/* wow! this is splitted dir, we'd like to handle it */
- rc = lmv_create_obj(exp, &body->fid1, mea);
- if (rc)
- RETURN(rc);
-
- obj = lmv_grab_obj(obd, cfid);
+ obj = lmv_create_obj(exp, &body->fid1, mea);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
}
if (obj) {
cfid = &body->fid1;
obj2 = lmv_grab_obj(obd, cfid);
- if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+ if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) {
/* wow! this is splitted dir, we'd like to handle it. */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- rc = lmv_create_obj(exp, &body->fid1, mea);
- if (rc)
- RETURN(rc);
-
- obj2 = lmv_grab_obj(obd, cfid);
+ obj2 = lmv_create_obj(exp, &body->fid1, mea);
+ if (IS_ERR(obj2))
+ RETURN(PTR_ERR(obj2));
}
if (obj2) {
LASSERT(body != NULL);
obj = lmv_grab_obj(obd, &body->fid1);
- LASSERT(obj);
+ LASSERT(obj != NULL);
CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
(unsigned long)body->fid1.mds,
uctxt.gid1 = 0;
uctxt.gid2 = 0;
+
+ lmv_lock_obj(obj);
for (i = 0; i < obj->objcount; i++) {
struct ll_fid fid = obj->objs[i].fid;
rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
NULL, 0, NULL, 0, &fid, &it, 0, &req,
lmv_dirobj_blocking_ast);
+
lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
if (rc > 0) {
/* nice, this slave is valid */
* obj is still valid. lookup it again */
LASSERT(req == NULL);
req = NULL;
+
memset(&it, 0, sizeof(it));
it.it_op = IT_GETATTR;
rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
obj->objs[i].size = body2->size;
CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long) obj->objs[i].size);
+ (unsigned long)obj->objs[i].size);
LDLM_LOCK_PUT(lock);
ptlrpc_req_finished(req);
release_lock:
lmv_update_body_from_obj(body, obj->objs + i);
+
if (it.d.lustre.it_lock_mode)
ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
}
cleanup:
- if (obj)
- lmv_put_obj(obj);
+ lmv_unlock_obj(obj);
+ lmv_put_obj(obj);
RETURN(rc);
}
}
if (rc == -ERESTART) {
- /* directory got splitted since last update. this shouldn't
- * be becasue splitting causes lock revocation, so revalidate
- * had to fail and lookup on dir had to return mea */
+ /* directory got splitted since last update. this shouldn't be
+ * becasue splitting causes lock revocation, so revalidate had
+ * to fail and lookup on dir had to return mea */
CWARN("we haven't knew about directory splitting!\n");
LASSERT(obj == NULL);
- rc = lmv_create_obj(exp, &rpfid, NULL);
- if (rc)
- RETURN(rc);
+
+ obj = lmv_create_obj(exp, &rpfid, NULL);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
+
goto repeat;
}
if (rc < 0)
RETURN(rc);
- /* okay, MDS has returned success. probably name has been
- * resolved in remote inode */
+ /* okay, MDS has returned success. probably name has been resolved in
+ * remote inode */
rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
reqp, cb_blocking);
- if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+ if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) {
/* wow! this is splitted dir, we'd like to handle it */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- if ((obj = lmv_grab_obj(obd, &body->fid1)))
+ obj = lmv_grab_obj(obd, &body->fid1);
+ if (!obj) {
+ obj = lmv_create_obj(exp, &body->fid1, mea);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
+ } else {
lmv_put_obj(obj);
- else
- rc = lmv_create_obj(exp, &body->fid1, mea);
+ }
}
RETURN(rc);
* need not to be update, another fields (i_size, for example) are
* cached all the time */
obj = lmv_grab_obj(obd, mfid);
- LASSERT(obj);
+ LASSERT(obj != NULL);
uctxt.gid1 = 0;
uctxt.gid2 = 0;
master_lock_mode = 0;
+
+ lmv_lock_obj(obj);
for (i = 0; i < obj->objcount; i++) {
struct ll_fid fid = obj->objs[i].fid;
* obj is still valid. lookup it again */
LASSERT(req == NULL);
req = NULL;
+
memset(&it, 0, sizeof(it));
it.it_op = IT_GETATTR;
rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
LASSERT(rc <= 0);
+
if (rc < 0)
/* error during lookup */
GOTO(cleanup, rc);
update:
obj->objs[i].size = body->size;
CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long) obj->objs[i].size);
+ (unsigned long)obj->objs[i].size);
if (req)
ptlrpc_req_finished(req);
}
if (*reqp) {
- /* some attrs got refreshed, we have reply and it's time
- * to put fresh attrs to it */
+ /* some attrs got refreshed, we have reply and it's time to put
+ * fresh attrs to it */
CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
(unsigned long) size);
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
rc = 1;
}
cleanup:
- if (obj)
- lmv_put_obj(obj);
+ lmv_unlock_obj(obj);
+ lmv_put_obj(obj);
RETURN(rc);
}
if (rc)
RETURN(rc);
+ LASSERT(i < lmv->desc.ld_tgt_count);
+
+ rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid,
+ ea_size, request);
+ if (rc)
+ RETURN(rc);
+
obj = lmv_grab_obj(obd, fid);
CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n",
(unsigned long)fid->mds, (unsigned long)fid->id,
(unsigned long)fid->generation, obj ? "(splitted)" : "");
- LASSERT(fid->mds < lmv->desc.ld_tgt_count);
- rc = md_getattr(lmv->tgts[i].ltd_exp, fid,
- valid, ea_size, request);
- if (rc == 0 && obj) {
+ if (obj) {
/* we have to loop over dirobjs here and gather attrs for all
* the slaves. */
#warning "attrs gathering here"
- }
-
- if (obj)
lmv_put_obj(obj);
+ }
RETURN(rc);
}
if (rc)
RETURN(rc);
- CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n",
- (unsigned long) fid->mds, (unsigned long) fid->id,
- (unsigned long) fid->generation);
+ CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", (unsigned long)fid->mds,
+ (unsigned long)fid->id, (unsigned long)fid->generation);
LASSERT(fid->mds < lmv->desc.ld_tgt_count);
struct lmv_obj *obj;
int rc = 0, mds;
ENTRY;
+
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
+
LASSERT(pfid->mds < lmv->desc.ld_tgt_count);
LASSERT(cfid->mds < lmv->desc.ld_tgt_count);
+
CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu:%*s -> %lu/%lu/%lu\n",
- (unsigned long) pfid->mds, (unsigned long) pfid->id,
- (unsigned long) pfid->generation, len, name,
- (unsigned long) cfid->mds, (unsigned long) cfid->id,
- (unsigned long) cfid->generation);
+ (unsigned long)pfid->mds, (unsigned long)pfid->id,
+ (unsigned long)pfid->generation, len, name,
+ (unsigned long)cfid->mds, (unsigned long)cfid->id,
+ (unsigned long)cfid->generation);
- /* this is default mds for directory name belongs to */
+ /* this is default mds for directory name belongs to. */
mds = pfid->mds;
obj = lmv_grab_obj(obd, pfid);
if (obj) {
- /* directory is splitted. look for right mds for this name */
+ /* directory is splitted. look for right mds for this name. */
mds = raw_name2idx(obj->objcount, name, len);
lmv_put_obj(obj);
}
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct ptlrpc_request *req = NULL;
+ struct lmv_obj *obj;
struct lustre_md md;
unsigned long valid;
int mealen, rc;
if (md.mea == NULL)
GOTO(cleanup, rc = -ENODATA);
- rc = lmv_create_obj(exp, fid, md.mea);
+ obj = lmv_create_obj(exp, fid, md.mea);
+ if (IS_ERR(obj))
+ rc = PTR_ERR(obj);
+
obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
cleanup:
}
CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
- op_data->namelen, op_data->name,
- (unsigned long) op_data->fid1.mds,
- (unsigned long) op_data->fid1.id,
- (unsigned long) op_data->fid1.generation);
+ op_data->namelen, op_data->name,
+ (unsigned long)op_data->fid1.mds,
+ (unsigned long)op_data->fid1.id,
+ (unsigned long)op_data->fid1.generation);
+
rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data,
datalen, mode, uid, gid, rdev, request);
if (rc == 0) {
+
if (*request == NULL)
- RETURN(rc);
+ RETURN(rc);
+
mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
sizeof(*mds_body));
LASSERT(mds_body != NULL);
+
CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
- (unsigned long) mds_body->fid1.id,
- (unsigned long) mds_body->fid1.generation,
+ (unsigned long)mds_body->fid1.id,
+ (unsigned long)mds_body->fid1.generation,
op_data->fid1.mds);
+
LASSERT(mds_body->valid & OBD_MD_MDS ||
mds_body->mds == op_data->fid1.mds);
} else if (rc == -ERESTART) {
- /* directory got splitted. time to update local object
- * and repeat the request with proper MDS */
+ /* directory got splitted. time to update local object and
+ * repeat the request with proper MDS */
rc = lmv_get_mea_and_update_object(exp, &op_data->fid1);
if (rc == 0) {
ptlrpc_req_finished(*request);
}
int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
- struct lookup_intent *it, int lockmode,
- struct mdc_op_data *data, struct lustre_handle *lockh,
- void *lmm, int lmmsize,
- ldlm_completion_callback cb_completion,
- ldlm_blocking_callback cb_blocking, void *cb_data)
+ struct lookup_intent *it, int lockmode,
+ struct mdc_op_data *data, struct lustre_handle *lockh,
+ void *lmm, int lmmsize, ldlm_completion_callback cb_completion,
+ ldlm_blocking_callback cb_blocking, void *cb_data)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
int lmv_enqueue(struct obd_export *exp, int lock_type,
struct lookup_intent *it, int lock_mode,
struct mdc_op_data *data, struct lustre_handle *lockh,
- void *lmm, int lmmsize,
- ldlm_completion_callback cb_completion,
+ void *lmm, int lmmsize, ldlm_completion_callback cb_completion,
ldlm_blocking_callback cb_blocking, void *cb_data)
{
struct obd_device *obd = exp->exp_obd;
if (data->namelen) {
obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
- /* directory is splitted. look for
- * right mds for this name */
+ /* directory is splitted. look for right mds for this
+ * name */
mds = raw_name2idx(obj->objcount, (char *)data->name,
data->namelen);
data->fid1 = obj->objs[mds].fid;
lmv_put_obj(obj);
}
}
- CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n",
- LL_IT2STR(it), (unsigned long) data->fid1.id,
- (unsigned long) data->fid1.generation);
+ CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", LL_IT2STR(it),
+ (unsigned long)data->fid1.id, (unsigned long)data->fid1.generation);
+
rc = md_enqueue(lmv->tgts[data->fid1.mds].ltd_exp, lock_type, it,
lock_mode, data, lockh, lmm, lmmsize, cb_completion,
cb_blocking, cb_data);
}
int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid,
- char *filename, int namelen, unsigned long valid,
- unsigned int ea_size, struct ptlrpc_request **request)
+ char *filename, int namelen, unsigned long valid,
+ unsigned int ea_size, struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
rfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
+
CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu -> %lu/%lu/%lu\n",
- namelen, filename, (unsigned long) fid->mds,
- (unsigned long) fid->id, (unsigned long) fid->generation,
- (unsigned long) rfid.mds, (unsigned long) rfid.id,
- (unsigned long) rfid.generation);
- rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, namelen,
- valid, ea_size, request);
+ namelen, filename, (unsigned long)fid->mds,
+ (unsigned long)fid->id, (unsigned long)fid->generation,
+ (unsigned long)rfid.mds, (unsigned long)rfid.id,
+ (unsigned long)rfid.generation);
+
+ rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename,
+ namelen, valid, ea_size, request);
if (rc == 0) {
- /* this could be cross-node reference. in this case all
- * we have right now is mds/ino/generation triple. we'd
- * like to find other attributes */
+ /* this could be cross-node reference. in this case all we have
+ * right now is mds/ino/generation triple. we'd like to find
+ * other attributes */
body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
LASSERT(body != NULL);
if (body->valid & OBD_MD_MDS) {
*request = req;
}
} else if (rc == -ERESTART) {
- /* directory got splitted. time to update local object
- * and repeat the request with proper MDS */
+ /* directory got splitted. time to update local object and
+ * repeat the request with proper MDS */
rc = lmv_get_mea_and_update_object(exp, &rfid);
if (rc == 0) {
ptlrpc_req_finished(*request);
/*
- * llite passes fid of an target inode in data->fid1 and
- * fid of directory in data->fid2
+ * llite passes fid of an target inode in data->fid1 and fid of directory in
+ * data->fid2
*/
int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
struct ptlrpc_request **request)
struct lmv_obj *obj;
int rc;
ENTRY;
+
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
+
if (data->namelen != 0) {
/* usual link request */
obj = lmv_grab_obj(obd, &data->fid1);
data->fid1 = obj->objs[rc].fid;
lmv_put_obj(obj);
}
- CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
- (unsigned) data->fid2.mds, (unsigned) data->fid2.id,
- (unsigned) data->fid2.generation, data->namelen,
- data->name, (unsigned) data->fid1.mds,
- (unsigned) data->fid1.id,
- (unsigned) data->fid1.generation, data->fid1.mds);
+
+ CDEBUG(D_OTHER,"link %lu/%lu/%lu:%*s to %lu/%lu/%lu mds %lu\n",
+ (unsigned long)data->fid2.mds,
+ (unsigned long)data->fid2.id,
+ (unsigned long)data->fid2.generation,
+ data->namelen, data->name,
+ (unsigned long)data->fid1.mds,
+ (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation,
+ (unsigned long)data->fid1.mds);
} else {
/* request from MDS to acquire i_links for inode by fid1 */
- CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
- (unsigned) data->fid1.mds, (unsigned) data->fid1.id,
- (unsigned) data->fid1.generation);
+ CDEBUG(D_OTHER, "inc i_nlinks for %lu/%lu/%lu\n",
+ (unsigned long)data->fid1.mds,
+ (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation);
}
rc = md_link(lmv->tgts[data->fid1.mds].ltd_exp, data, request);
ENTRY;
CDEBUG(D_OTHER, "rename %*s in %lu/%lu/%lu to %*s in %lu/%lu/%lu\n",
- oldlen, old, (unsigned long) data->fid1.mds,
- (unsigned long) data->fid1.id,
- (unsigned long) data->fid1.generation,
+ oldlen, old, (unsigned long)data->fid1.mds,
+ (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation,
newlen, new, (unsigned long) data->fid2.mds,
(unsigned long) data->fid2.id,
(unsigned long) data->fid2.generation);
+
if (!fid_equal(&data->fid1, &data->fid2))
CWARN("cross-node rename %lu/%lu/%lu:%*s to %lu/%lu/%lu:%*s\n",
- (unsigned long) data->fid1.mds,
- (unsigned long) data->fid1.id,
- (unsigned long) data->fid1.generation, oldlen, old,
- (unsigned long) data->fid2.mds,
- (unsigned long) data->fid2.id,
- (unsigned long) data->fid2.generation, newlen, new);
+ (unsigned long)data->fid1.mds,
+ (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation, oldlen, old,
+ (unsigned long)data->fid2.mds,
+ (unsigned long)data->fid2.id,
+ (unsigned long)data->fid2.generation, newlen, new);
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
if (oldlen == 0) {
- /* MDS with old dir entry is asking another MDS
- * to create name there */
+ /* MDS with old dir entry is asking another MDS to create name
+ * there */
CDEBUG(D_OTHER,
"create %*s(%d/%d) in %lu/%lu/%lu pointing to %lu/%lu/%lu\n",
newlen, new, oldlen, newlen,
- (unsigned long) data->fid2.mds,
- (unsigned long) data->fid2.id,
- (unsigned long) data->fid2.generation,
- (unsigned long) data->fid1.mds,
- (unsigned long) data->fid1.id,
- (unsigned long) data->fid1.generation);
+ (unsigned long)data->fid2.mds,
+ (unsigned long)data->fid2.id,
+ (unsigned long)data->fid2.generation,
+ (unsigned long)data->fid1.mds,
+ (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation);
mds = data->fid2.mds;
goto request;
}
obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
- /* directory is already splitted, so we have to forward
- * request to the right MDS */
+ /* directory is already splitted, so we have to forward request
+ * to the right MDS */
mds = raw_name2idx(obj->objcount, (char *)old, oldlen);
data->fid1 = obj->objs[mds].fid;
CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
- (unsigned long) obj->objs[mds].fid.mds,
- (unsigned long) obj->objs[mds].fid.id,
- (unsigned long) obj->objs[mds].fid.generation);
+ (unsigned long)obj->objs[mds].fid.mds,
+ (unsigned long)obj->objs[mds].fid.id,
+ (unsigned long)obj->objs[mds].fid.generation);
lmv_put_obj(obj);
}
obj = lmv_grab_obj(obd, &data->fid2);
if (obj) {
- /* directory is already splitted, so we have to forward
- * request to the right MDS */
+ /* directory is already splitted, so we have to forward request
+ * to the right MDS */
mds = raw_name2idx(obj->objcount, (char *)new, newlen);
data->fid2 = obj->objs[mds].fid;
CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
- (unsigned long) obj->objs[mds].fid.mds,
- (unsigned long) obj->objs[mds].fid.id,
- (unsigned long) obj->objs[mds].fid.generation);
+ (unsigned long)obj->objs[mds].fid.mds,
+ (unsigned long)obj->objs[mds].fid.id,
+ (unsigned long)obj->objs[mds].fid.generation);
lmv_put_obj(obj);
}
RETURN(rc);
obj = lmv_grab_obj(obd, &data->fid1);
+
CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n",
- (unsigned long) data->fid1.mds,
- (unsigned long) data->fid1.id,
- (unsigned long) data->fid1.generation, iattr->ia_valid,
+ (unsigned long)data->fid1.mds, (unsigned long)data->fid1.id,
+ (unsigned long)data->fid1.generation, iattr->ia_valid,
obj ? ", splitted" : "");
+
if (obj) {
for (i = 0; i < obj->objcount; i++) {
data->fid1 = obj->objs[i].fid;
- rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea,
- ealen, ea2, ea2len, &req);
- LASSERT(rc == 0);
+
+ rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr,
+ ea, ealen, ea2, ea2len, &req);
+ if (rc) {
+ lmv_put_obj(obj);
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+ }
+
if (fid_equal(&obj->fid, &obj->objs[i].fid)) {
- /* this is master object and this request
- * should be returned back to llite */
+ /* this is master object and this request should
+ * be returned back to llite */
*request = req;
} else {
ptlrpc_req_finished(req);
}
lmv_put_obj(obj);
} else {
- LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count);
+ LASSERT(i < lmv->desc.ld_tgt_count);
rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen,
ea2, ea2len, request);
if (rc == 0) {
mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
- sizeof(*mds_body));
+ sizeof(*mds_body));
LASSERT(mds_body != NULL);
LASSERT(mds_body->mds == i);
}
obj = lmv_grab_obj(obd, mdc_fid);
if (obj) {
- /* find dirobj containing page with requested offset */
- /* FIXME: what about protecting cached attrs here? */
+ lmv_lock_obj(obj);
+
+ /* find dirobj containing page with requested offset. */
for (i = 0; i < obj->objcount; i++) {
if (offset < obj->objs[i].size)
break;
offset -= obj->objs[i].size;
}
rfid = obj->objs[i].fid;
+
+ lmv_unlock_obj(obj);
lmv_put_obj(obj);
CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n",