- struct obd_device *obd = exp->exp_obd;
- struct ptlrpc_request *mreq = *reqp;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle master_lockh;
- struct ldlm_lock *lock;
- unsigned long size = 0;
- struct mds_body *body;
- struct ll_uctxt uctxt;
- struct lmv_obj *obj;
- int master_lock_mode;
- int i, rc = 0;
- ENTRY;
-
- /* we have to loop over the subobjects, check validity and update them
- * from MDSs if needed. it's very useful that we need not to update all
- * the fields. say, common fields (that are equal on all the subojects
- * need not to be update, another fields (i_size, for example) are
- * cached all the time */
- obj = lmv_grab_obj(obd, mfid);
- LASSERT(obj != NULL);
-
- uctxt.gid1 = 0;
- uctxt.gid2 = 0;
- master_lock_mode = 0;
-
- lmv_lock_obj(obj);
-
- for (i = 0; i < obj->objcount; i++) {
- struct ll_fid fid = obj->objs[i].fid;
- struct lustre_handle *lockh = NULL;
- struct ptlrpc_request *req = NULL;
- ldlm_blocking_callback cb;
- struct lookup_intent it;
- int master = 0;
-
- CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
- (unsigned long)fid.mds, (unsigned long)fid.id,
- (unsigned long) fid.generation);
-
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
- cb = lmv_dirobj_blocking_ast;
-
- if (fid_equal(&fid, &obj->fid)) {
- if (master_valid) {
- /* lmv_intent_getattr() already checked
- * validness and took the lock */
- if (mreq) {
- /* it even got the reply refresh attrs
- * from that reply */
- body = lustre_msg_buf(mreq->rq_repmsg,
- 1, sizeof(*body));
- LASSERT(body != NULL);
- goto update;
- }
- /* take already cached attrs into account */
- CDEBUG(D_OTHER,
- "master is locked and cached\n");
- goto release_lock;
- }
- master = 1;
- cb = cb_blocking;
- }
-
- /* is obj valid? */
- rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
- NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
- lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
- if (rc > 0) {
- /* nice, this slave is valid */
- LASSERT(req == NULL);
- CDEBUG(D_OTHER, "cached\n");
- goto release_lock;
- }
-
- if (rc < 0)
- /* error during revalidation */
- GOTO(cleanup, rc);
-
- /* rc == 0, this means we have no such a lock and can't think
- * obj is still valid. lookup it again */
- LASSERT(req == NULL);
- req = NULL;
-
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
- rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
- NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
- lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
- LASSERT(rc <= 0);
-
- if (rc < 0)
- /* error during lookup */
- GOTO(cleanup, rc);
-
- if (master) {
- LASSERT(master_valid == 0);
- /* save lock on master to be returned to the caller */
- CDEBUG(D_OTHER, "no lock on master yet\n");
- memcpy(&master_lockh, lockh, sizeof(master_lockh));
- master_lock_mode = it.d.lustre.it_lock_mode;
- it.d.lustre.it_lock_mode = 0;
- } else {
- /* this is slave. we want to control it */
- lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
- lock->l_ast_data = lmv_get_obj(obj);
- LDLM_LOCK_PUT(lock);
- }
-
- if (*reqp == NULL) {
- /* this is first reply, we'll use it to return
- * updated data back to the caller */
- LASSERT(req);
- ptlrpc_request_addref(req);
- *reqp = req;
-
- }
-
- body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
- LASSERT(body);
-
-update:
- obj->objs[i].size = body->size;
-
- CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long)obj->objs[i].size);
-
- if (req)
- ptlrpc_req_finished(req);
-release_lock:
- size += obj->objs[i].size;
-
- if (it.d.lustre.it_lock_mode)
- ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
- }
-
- if (*reqp) {
- /* some attrs got refreshed, we have reply and it's time to put
- * fresh attrs to it */
- CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
- (unsigned long)size);
-
- body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
- LASSERT(body);
-
- /* FIXME: what about other attributes? */
- body->size = size;
-
- if (mreq == NULL) {
- /* very important to maintain lli->mds the same because
- * of revalidation. mreq == NULL means that caller has
- * no reply and the only attr we can return is size */
- body->valid = OBD_MD_FLSIZE;
- body->mds = obj->fid.mds;
- }
- if (master_valid == 0) {
- memcpy(&oit->d.lustre.it_lock_handle,
- &master_lockh, sizeof(master_lockh));
- oit->d.lustre.it_lock_mode = master_lock_mode;
- }
- rc = 0;
- } else {
- /* it seems all the attrs are fresh and we did no request */
- CDEBUG(D_OTHER, "all the attrs were fresh\n");
- if (master_valid == 0)
- oit->d.lustre.it_lock_mode = master_lock_mode;
- rc = 1;
- }
-cleanup:
- lmv_unlock_obj(obj);
- lmv_put_obj(obj);
- RETURN(rc);