- struct obd_device *obd = exp->exp_obd;
- struct ptlrpc_request *mreq = *reqp;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle master_lockh;
- unsigned long size = 0;
- struct ldlm_lock *lock;
- struct mds_body *body;
- struct ll_uctxt uctxt;
- struct lmv_obj *obj;
- int master_lock_mode;
- int i, rc = 0;
- ENTRY;
-
- /* we have to loop over the subobjects, check validity and update
- * them from MDSs if needed. it's very useful that we need not to
- * update all the fields. say, common fields (that are equal on
- * all the subojects need not to be update, another fields (i_size,
- * for example) are cached all the time */
- obj = lmv_grab_obj(obd, mfid, 0);
- LASSERT(obj);
-
- master_lock_mode = 0;
- uctxt.gid1 = 0;
- uctxt.gid2 = 0;
- for (i = 0; i < obj->objcount; i++) {
- struct ll_fid fid = obj->objs[i].fid;
- struct lustre_handle *lockh = NULL;
- struct ptlrpc_request *req = NULL;
- ldlm_blocking_callback cb;
- struct lookup_intent it;
- int master = 0;
-
- CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
- (unsigned long) fid.mds,
- (unsigned long) fid.id,
- (unsigned long) fid.generation);
-
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
- cb = lmv_dirobj_blocking_ast;
-
- if (fid_equal(&fid, &obj->fid)) {
- if (master_valid) {
- /* lmv_intent_getattr() already checked
- * validness and took the lock */
- if (mreq) {
- /* it even got the reply
- * refresh attrs from that reply */
- body = lustre_msg_buf(mreq->rq_repmsg,
- 1,sizeof(*body));
- LASSERT(body != NULL);
- goto update;
- }
- /* take already cached attrs into account */
- CDEBUG(D_OTHER,
- "master is locked and cached\n");
- goto release_lock;
- }
- master = 1;
- cb = cb_blocking;
- }
-
- /* is obj valid? */
- rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
- NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
- lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
- if (rc > 0) {
- /* nice, this slave is valid */
- LASSERT(req == NULL);
- CDEBUG(D_OTHER, "cached\n");
- goto release_lock;
- }
-
- if (rc < 0) {
- /* error during revalidation */
- GOTO(cleanup, rc);
- }
-
- /* rc == 0, this means we have no such a lock and can't
- * think obj is still valid. lookup it again */
- LASSERT(req == NULL);
- req = NULL;
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
- rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
- NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
- lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
- LASSERT(rc <= 0);
- if (rc < 0) {
- /* error during lookup */
- GOTO(cleanup, rc);
- }
-
- if (master) {
- LASSERT(master_valid == 0);
- /* save lock on master to be returned to the caller */
- CDEBUG(D_OTHER, "no lock on master yet\n");
- memcpy(&master_lockh, lockh, sizeof(master_lockh));
- master_lock_mode = it.d.lustre.it_lock_mode;
- it.d.lustre.it_lock_mode = 0;
- } else {
- /* this is slave. we want to control it */
- lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
- lock->l_ast_data = obj;
- atomic_inc(&obj->count);
- LDLM_LOCK_PUT(lock);
- }
-
- if (*reqp == NULL) {
- /* this is first reply, we'll use it to return
- * updated data back to the caller */
- LASSERT(req);
- ptlrpc_request_addref(req);
- *reqp = req;
-
- }
-
- body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
- LASSERT(body);
-
-update:
- obj->objs[i].size = body->size;
- CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long) obj->objs[i].size);
-
- if (req)
- ptlrpc_req_finished(req);
-release_lock:
- size += obj->objs[i].size;
- if (it.d.lustre.it_lock_mode)
- ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
- }
-
- if (*reqp) {
- /* some attrs got refreshed, we have reply and it's time
- * to put fresh attrs to it */
- CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
- (unsigned long) size);
- body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
- LASSERT(body);
- /* FIXME: what about another attributes? */
- body->size = size;
- if (mreq == NULL) {
- /* very important to maintain lli->mds the same
- * because of revalidation. mreq == NULL means
- * that caller has no reply and the only attr
- * we can return is size */
- body->valid = OBD_MD_FLSIZE;
- body->mds = obj->fid.mds;
- }
- if (master_valid == 0) {
- memcpy(&oit->d.lustre.it_lock_handle,
- &master_lockh, sizeof(master_lockh));
- oit->d.lustre.it_lock_mode = master_lock_mode;
- }
- rc = 0;
- } else {
- /* it seems all the attrs are fresh and we did no request */
- CDEBUG(D_OTHER, "all the attrs were fresh\n");
- if (master_valid == 0)
- oit->d.lustre.it_lock_mode = master_lock_mode;
- rc = 1;
- }
-cleanup:
- RETURN(rc);
+ int rc;
+ ENTRY;
+
+ LASSERT(it != NULL);
+ LASSERT(fid_is_sane(&op_data->op_fid1));
+
+ CDEBUG(D_INODE, "INTENT LOCK '%s' for "DFID" '%.*s' on "DFID"\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid2),
+ (int)op_data->op_namelen, op_data->op_name,
+ PFID(&op_data->op_fid1));
+
+ if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT | IT_GETXATTR))
+ rc = lmv_intent_lookup(exp, op_data, it, reqp, cb_blocking,
+ extra_lock_flags);
+ else if (it->it_op & IT_OPEN)
+ rc = lmv_intent_open(exp, op_data, it, reqp, cb_blocking,
+ extra_lock_flags);
+ else
+ LBUG();
+
+ if (rc < 0) {
+ struct lustre_handle lock_handle;
+
+ if (it->it_lock_mode != 0) {
+ lock_handle.cookie = it->it_lock_handle;
+ ldlm_lock_decref_and_cancel(&lock_handle,
+ it->it_lock_mode);
+ }
+
+ it->it_lock_handle = 0;
+ it->it_lock_mode = 0;
+
+ if (it->it_remote_lock_mode != 0) {
+ lock_handle.cookie = it->it_remote_lock_handle;
+ ldlm_lock_decref_and_cancel(&lock_handle,
+ it->it_remote_lock_mode);
+ }
+
+ it->it_remote_lock_handle = 0;
+ it->it_remote_lock_mode = 0;
+ }
+
+ RETURN(rc);