Whamcloud - gitweb
d0bff3742db6a9370cb1caf1e7502ae2c4590707
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38
39 #include <lustre/lustre_idl.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_dlm.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46 #include "lmv_internal.h"
47
48 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
49 {
50         if (it->d.lustre.it_lock_mode != 0)
51                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
52                                  it->d.lustre.it_lock_mode);
53 }
54
55 int lmv_intent_remote(struct obd_export *exp, void *lmm,
56                       int lmmsize, struct lookup_intent *it,
57                       int flags, struct ptlrpc_request **reqp,
58                       ldlm_blocking_callback cb_blocking,
59                       int extra_lock_flags)
60 {
61         struct obd_device *obd = exp->exp_obd;
62         struct lmv_obd *lmv = &obd->u.lmv;
63         struct ptlrpc_request *req = NULL;
64         struct mdt_body *body = NULL;
65         struct lustre_handle plock;
66         struct md_op_data *op_data;
67         struct obd_export *tgt_exp;
68         int pmode, rc = 0;
69         ENTRY;
70
71         body = lustre_msg_buf((*reqp)->rq_repmsg,
72                               DLM_REPLY_REC_OFF, sizeof(*body));
73         LASSERT(body != NULL);
74
75         if (!(body->valid & OBD_MD_MDS))
76                 RETURN(0);
77
78         /*
79          * oh, MDS reports that this is remote inode case i.e. we have to ask
80          * for real attrs on another MDS.
81          */
82         if (it->it_op & IT_LOOKUP) {
83                 /*
84                  * unfortunately, we have to lie to MDC/MDS to retrieve
85                  * attributes llite needs.
86                  */
87                 it->it_op = IT_GETATTR;
88         }
89
90         /* we got LOOKUP lock, but we really need attrs */
91         pmode = it->d.lustre.it_lock_mode;
92         if (pmode) {
93                 memcpy(&plock, &it->d.lustre.it_lock_handle,
94                        sizeof(plock));
95                 it->d.lustre.it_lock_mode = 0;
96                 it->d.lustre.it_data = 0;
97         }
98
99         LASSERT(fid_is_sane(&body->fid1));
100
101         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
102
103         OBD_ALLOC_PTR(op_data);
104         if (op_data == NULL)
105                 GOTO(out, rc = -ENOMEM);
106         
107         op_data->fid1 = body->fid1;
108
109         tgt_exp = lmv_get_export(lmv, &body->fid1);
110         if (IS_ERR(tgt_exp))
111                 RETURN(PTR_ERR(tgt_exp));
112
113         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
114                             &req, cb_blocking, extra_lock_flags);
115
116         /*
117          * llite needs LOOKUP lock to track dentry revocation in order to
118          * maintain dcache consistency. Thus drop UPDATE lock here and put
119          * LOOKUP in request.
120          */
121         if (rc == 0) {
122                 lmv_drop_intent_lock(it);
123                 memcpy(&it->d.lustre.it_lock_handle, &plock,
124                        sizeof(plock));
125                 it->d.lustre.it_lock_mode = pmode;
126         }
127
128         OBD_FREE_PTR(op_data);
129         EXIT;
130 out:
131         if (rc && pmode)
132                 ldlm_lock_decref(&plock, pmode);
133
134         ptlrpc_req_finished(*reqp);
135         *reqp = req;
136         return rc;
137 }
138
139 /*
140  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
141  * may be split dir.
142  */
143 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
144                     void *lmm, int lmmsize, struct lookup_intent *it,
145                     int flags, struct ptlrpc_request **reqp,
146                     ldlm_blocking_callback cb_blocking,
147                     int extra_lock_flags)
148 {
149         struct obd_device *obd = exp->exp_obd;
150         struct lu_fid rpid = op_data->fid1;
151         struct lmv_obd *lmv = &obd->u.lmv;
152         struct mdt_body *body = NULL;
153         struct md_op_data *sop_data;
154         struct lmv_stripe_md *mea;
155         struct lmv_obj *obj;
156         int rc, loop = 0;
157         mdsno_t mds;
158         ENTRY;
159
160         OBD_ALLOC_PTR(sop_data);
161         if (sop_data == NULL)
162                 RETURN(-ENOMEM);
163         
164         /* save op_data fro repeat case */
165         *sop_data = *op_data;
166         
167 repeat:
168         LASSERT(++loop <= 2);
169         rc = lmv_fld_lookup(lmv, &rpid, &mds);
170         if (rc)
171                 GOTO(out_free_sop_data, rc);
172         obj = lmv_obj_grab(obd, &rpid);
173         if (obj) {
174                 /*
175                  * Directory is already split, so we have to forward request to
176                  * the right MDS.
177                  */
178                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
179                                    (char *)op_data->name, op_data->namelen);
180
181                 rpid = obj->lo_inodes[mds].li_fid;
182                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
183                 lmv_obj_put(obj);
184                 if (rc) 
185                         GOTO(out_free_sop_data, rc);
186                 
187                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
188                        mds, PFID(&rpid));
189         }
190
191         sop_data->fid1 = rpid;
192
193         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
194                             lmm, lmmsize, it, flags, reqp,
195                             cb_blocking, extra_lock_flags);
196         if (rc == -ERESTART) {
197                 /*
198                  * Directory got split. Time to update local object and repeat
199                  * the request with proper MDS.
200                  */
201                 LASSERT(lu_fid_eq(&op_data->fid1, &rpid));
202                 rc = lmv_handle_split(exp, &rpid);
203                 if (rc == 0) {
204                         ptlrpc_req_finished(*reqp);
205                         goto repeat;
206                 }
207         }
208         if (rc != 0)
209                 GOTO(out_free_sop_data, rc);
210
211         /*
212          * Okay, MDS has returned success. Probably name has been resolved in
213          * remote inode.
214          */
215         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
216                                cb_blocking, extra_lock_flags);
217         if (rc != 0) {
218                 LASSERT(rc < 0);
219
220                 /*
221                  * This is possible, that some userspace application will try to
222                  * open file as directory and we will have -ENOTDIR here. As
223                  * this is "usual" situation, we should not print error here,
224                  * only debug info.
225                  */
226                 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
227                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->fid2),
228                        PFID(&rpid), op_data->namelen, op_data->name, rc);
229                 GOTO(out_free_sop_data, rc);
230         }
231
232         /*
233          * nothing is found, do not access body->fid1 as it is zero and thus
234          * pointless.
235          */
236         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
237             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
238             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
239                 GOTO(out_free_sop_data, rc = 0);
240
241         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
242          * to update them for split dir */
243         body = lustre_msg_buf((*reqp)->rq_repmsg,
244                               DLM_REPLY_REC_OFF, sizeof(*body));
245         LASSERT(body != NULL);
246
247         /* could not find object, FID is not present in response. */
248         if (!(body->valid & OBD_MD_FLID))
249                 GOTO(out_free_sop_data, rc = 0);
250
251         obj = lmv_obj_grab(obd, &body->fid1);
252         if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
253                 /* wow! this is split dir, we'd like to handle it */
254                 obj = lmv_obj_create(exp, &body->fid1, mea);
255                 if (IS_ERR(obj))
256                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
257         }
258
259         if (obj) {
260                 /* this is split dir and we'd want to get attrs */
261                 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
262                        PFID(&body->fid1));
263
264                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
265                                            cb_blocking, extra_lock_flags);
266         } else if (S_ISDIR(body->mode)) {
267                 CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
268                        PFID(&body->fid1));
269         }
270
271         if (obj)
272                 lmv_obj_put(obj);
273
274         EXIT;
275 out_free_sop_data:
276         OBD_FREE_PTR(sop_data);
277         return rc;
278 }
279
280 int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
281                        void *lmm, int lmmsize, struct lookup_intent *it,
282                        int flags, struct ptlrpc_request **reqp,
283                        ldlm_blocking_callback cb_blocking,
284                        int extra_lock_flags)
285 {
286         struct lmv_obj *obj = NULL, *obj2 = NULL;
287         struct obd_device *obd = exp->exp_obd;
288         struct lu_fid rpid = op_data->fid1;
289         struct lmv_obd *lmv = &obd->u.lmv;
290         struct mdt_body *body = NULL;
291         struct md_op_data *sop_data;
292         struct lmv_stripe_md *mea;
293         mdsno_t mds;
294         int rc = 0;
295         ENTRY;
296
297         OBD_ALLOC_PTR(sop_data);
298         if (sop_data == NULL)
299                 RETURN(-ENOMEM);
300         
301         /* save op_data fro repeat case */
302         *sop_data = *op_data;
303         
304         if (fid_is_sane(&op_data->fid2)) {
305                 /*
306                  * Caller wants to revalidate attrs of obj we have to revalidate
307                  * slaves if requested object is split directory.
308                  */
309                 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
310                        PFID(&op_data->fid2));
311                 
312                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
313                 if (rc)
314                         GOTO(out_free_sop_data, rc);
315 #if 0
316                 /*
317                  * In fact, we do not need this with current intent_lock(), but
318                  * it may change some day.
319                  */
320                 obj = lmv_obj_grab(obd, &op_data->fid2);
321                 if (obj) {
322                         if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
323                                 rpid = obj->lo_inodes[mds].li_fid;
324                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
325                                 if (rc) {
326                                         lmv_obj_put(obj);
327                                         GOTO(out_free_sop_data, rc);
328                                 }
329                         }
330                         lmv_obj_put(obj);
331                 }
332 #endif
333         } else {
334                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
335                        op_data->namelen, op_data->name,
336                        PFID(&op_data->fid1));
337                 
338                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
339                 if (rc)
340                         GOTO(out_free_sop_data, rc);
341                 obj = lmv_obj_grab(obd, &op_data->fid1);
342                 if (obj && op_data->namelen) {
343                         /* directory is already split. calculate mds */
344                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
345                                            (char *)op_data->name,
346                                            op_data->namelen);
347                         
348                         rpid = obj->lo_inodes[mds].li_fid;
349                         rc = lmv_fld_lookup(lmv, &rpid, &mds);
350                         if (rc) {
351                                 lmv_obj_put(obj);
352                                 GOTO(out_free_sop_data, rc);
353                         }
354                         lmv_obj_put(obj);
355
356                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
357                                mds, PFID(&rpid));
358                 }
359         }
360
361         sop_data->fid1 = rpid;
362
363         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
364                             lmmsize, it, flags, reqp, cb_blocking,
365                             extra_lock_flags);
366         if (rc < 0)
367                 GOTO(out_free_sop_data, rc);
368
369         if (obj && rc > 0) {
370                 /*
371                  * This is split dir. In order to optimize things a bit, we
372                  * consider obj valid updating missing parts.
373
374                  * FIXME: do we need to return any lock here? It would be fine
375                  * if we don't. This means that nobody should use UPDATE lock to
376                  * notify about object * removal.
377                  */
378                 CDEBUG(D_OTHER,
379                        "revalidate slaves for "DFID", rc %d\n",
380                        PFID(&op_data->fid2), rc);
381
382                 LASSERT(fid_is_sane(&op_data->fid2));
383                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->fid2, it, rc,
384                                            cb_blocking, extra_lock_flags);
385                 GOTO(out_free_sop_data, rc);
386         }
387
388         if (*reqp == NULL)
389                 GOTO(out_free_sop_data, rc);
390
391         /*
392          * okay, MDS has returned success. Probably name has been resolved in
393          * remote inode.
394          */
395         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
396                                reqp, cb_blocking, extra_lock_flags);
397         if (rc < 0)
398                 GOTO(out_free_sop_data, rc);
399
400         /*
401          * Nothing is found, do not access body->fid1 as it is zero and thus
402          * pointless.
403          */
404         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
405                 GOTO(out_free_sop_data, rc = 0);
406
407         LASSERT(*reqp);
408         LASSERT((*reqp)->rq_repmsg);
409         body = lustre_msg_buf((*reqp)->rq_repmsg,
410                               DLM_REPLY_REC_OFF, sizeof(*body));
411         LASSERT(body != NULL);
412
413         /* could not find object, FID is not present in response. */
414         if (!(body->valid & OBD_MD_FLID))
415                 GOTO(out_free_sop_data, rc = 0);
416
417         obj2 = lmv_obj_grab(obd, &body->fid1);
418
419         if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
420                 /* wow! this is split dir, we'd like to handle it. */
421                 obj2 = lmv_obj_create(exp, &body->fid1, mea);
422                 if (IS_ERR(obj2))
423                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
424         }
425
426         if (obj2) {
427                 /* this is split dir and we'd want to get attrs */
428                 CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
429                        PFID(&body->fid1), rc);
430
431                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
432                                            cb_blocking, extra_lock_flags);
433                 lmv_obj_put(obj2);
434         }
435
436         EXIT;
437 out_free_sop_data:
438         OBD_FREE_PTR(sop_data);
439         return rc;
440 }
441
442 void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
443 {
444         /* update size */
445         body->size += lino->li_size;
446 }
447
448 /* this is not used currently */
449 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
450 {
451         struct obd_device *obd = exp->exp_obd;
452         struct lmv_obd *lmv = &obd->u.lmv;
453         struct mdt_body *body = NULL;
454         struct lustre_handle *lockh;
455         struct md_op_data *op_data;
456         struct ldlm_lock *lock;
457         struct mdt_body *body2;
458         struct lmv_obj *obj;
459         int i, rc = 0;
460         ENTRY;
461
462         LASSERT(reqp);
463         LASSERT(*reqp);
464
465         /* master is locked. we'd like to take locks on slaves and update
466          * attributes to be returned from the slaves it's important that lookup
467          * is called in two cases:
468
469          *  - for first time (dcache has no such a resolving yet).  -
470          *  ->d_revalidate() returned false.
471
472          * last case possible only if all the objs (master and all slaves aren't
473          * valid */
474
475         body = lustre_msg_buf((*reqp)->rq_repmsg,
476                               DLM_REPLY_REC_OFF, sizeof(*body));
477         LASSERT(body != NULL);
478         LASSERT((body->valid & OBD_MD_FLID) != 0);
479
480         obj = lmv_obj_grab(obd, &body->fid1);
481         LASSERT(obj != NULL);
482
483         CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
484                PFID(&body->fid1));
485
486         OBD_ALLOC_PTR(op_data);
487         if (op_data == NULL)
488                 RETURN(-ENOMEM);
489         
490         lmv_obj_lock(obj);
491
492         for (i = 0; i < obj->lo_objcount; i++) {
493                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
494                 struct ptlrpc_request *req = NULL;
495                 struct obd_export *tgt_exp;
496                 struct lookup_intent it;
497
498                 if (lu_fid_eq(&fid, &obj->lo_fid))
499                         /* skip master obj */
500                         continue;
501
502                 CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
503
504                 /* is obj valid? */
505                 memset(&it, 0, sizeof(it));
506                 it.it_op = IT_GETATTR;
507
508                 memset(op_data, 0, sizeof(*op_data));
509                 op_data->fid1 = fid;
510                 op_data->fid2 = fid;
511
512                 tgt_exp = lmv_get_export(lmv, &fid);
513                 if (IS_ERR(tgt_exp))
514                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
515
516                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req,
517                                     lmv_blocking_ast, 0);
518
519                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
520                 if (rc > 0 && req == NULL) {
521                         /* nice, this slave is valid */
522                         LASSERT(req == NULL);
523                         CDEBUG(D_OTHER, "cached\n");
524                         goto release_lock;
525                 }
526
527                 if (rc < 0) {
528                         /* error during lookup */
529                         GOTO(cleanup, rc);
530                 }
531                 lock = ldlm_handle2lock(lockh);
532                 LASSERT(lock);
533
534                 lock->l_ast_data = lmv_obj_get(obj);
535
536                 body2 = lustre_msg_buf(req->rq_repmsg,
537                                        DLM_REPLY_REC_OFF, sizeof(*body2));
538                 LASSERT(body2);
539
540                 obj->lo_inodes[i].li_size = body2->size;
541
542                 CDEBUG(D_OTHER, "fresh: %lu\n",
543                        (unsigned long)obj->lo_inodes[i].li_size);
544
545                 LDLM_LOCK_PUT(lock);
546
547                 if (req)
548                         ptlrpc_req_finished(req);
549 release_lock:
550                 lmv_update_body(body, obj->lo_inodes + i);
551
552                 if (it.d.lustre.it_lock_mode)
553                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
554         }
555
556         EXIT;
557 cleanup:
558         OBD_FREE_PTR(op_data);
559         lmv_obj_unlock(obj);
560         lmv_obj_put(obj);
561         return rc;
562 }
563
564 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
565                       void *lmm, int lmmsize, struct lookup_intent *it,
566                       int flags, struct ptlrpc_request **reqp,
567                       ldlm_blocking_callback cb_blocking,
568                       int extra_lock_flags)
569 {
570         struct obd_device *obd = exp->exp_obd;
571         struct lu_fid rpid = op_data->fid1;
572         struct lmv_obd *lmv = &obd->u.lmv;
573         struct mdt_body *body = NULL;
574         struct md_op_data *sop_data;
575         struct lmv_stripe_md *mea;
576         struct lmv_obj *obj;
577         int rc, loop = 0;
578         mdsno_t mds;
579         ENTRY;
580
581         OBD_ALLOC_PTR(sop_data);
582         if (sop_data == NULL)
583                 RETURN(-ENOMEM);
584         
585         /* save op_data fro repeat case */
586         *sop_data = *op_data;
587         
588         /*
589          * IT_LOOKUP is intended to produce name -> fid resolving (let's call
590          * this lookup below) or to confirm requested resolving is still valid
591          * (let's call this revalidation) fid_is_sane(&sop_data->fid2) specifies
592          * revalidation.
593          */
594         if (fid_is_sane(&op_data->fid2)) {
595                 /*
596                  * This is revalidate: we have to check is LOOKUP lock still
597                  * valid for given fid. Very important part is that we have to
598                  * choose right mds because namespace is per mds.
599                  */
600                 rpid = op_data->fid1;
601                 obj = lmv_obj_grab(obd, &rpid);
602                 if (obj) {
603                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
604                                            (char *)op_data->name,
605                                            op_data->namelen);
606                         rpid = obj->lo_inodes[mds].li_fid;
607                         lmv_obj_put(obj);
608                 }
609                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
610                 if (rc)
611                         GOTO(out_free_sop_data, rc);
612
613                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
614                        PFID(&op_data->fid2), mds);
615         } else {
616                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
617                 if (rc)
618                         GOTO(out_free_sop_data, rc);
619 repeat:
620                 LASSERT(++loop <= 2);
621
622                 /*
623                  * This is lookup. During lookup we have to update all the
624                  * attributes, because returned values will be put in struct
625                  * inode.
626                  */
627                 obj = lmv_obj_grab(obd, &op_data->fid1);
628                 if (obj) {
629                         if (op_data->namelen) {
630                                 /* directory is already split. calculate mds */
631                                 mds = raw_name2idx(obj->lo_hashtype,
632                                                    obj->lo_objcount,
633                                                    (char *)op_data->name,
634                                                    op_data->namelen);
635                                 rpid = obj->lo_inodes[mds].li_fid;
636                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
637                                 if (rc) {
638                                         lmv_obj_put(obj);
639                                         GOTO(out_free_sop_data, rc);
640                                 }
641                         }
642                         lmv_obj_put(obj);
643                 }
644                 fid_zero(&op_data->fid2);
645         }
646
647         sop_data->fid1 = rpid;
648
649         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
650                             it, flags, reqp, cb_blocking, extra_lock_flags);
651         if (rc > 0) {
652                 LASSERT(fid_is_sane(&op_data->fid2));
653                 GOTO(out_free_sop_data, rc);
654         }
655         if (rc > 0) {
656                 /*
657                  * Very interesting. it seems object is still valid but for some
658                  * reason llite calls lookup, not revalidate.
659                  */
660                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
661                       PFID(&rpid));
662                 LASSERT(*reqp == NULL);
663                 GOTO(out_free_sop_data, rc);
664         }
665
666         if (rc == 0 && *reqp == NULL) {
667                 /* once again, we're asked for lookup, not revalidate */
668                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
669                       PFID(&rpid));
670                 GOTO(out_free_sop_data, rc);
671         }
672
673         if (rc == -ERESTART) {
674                 /*
675                  * Directory got split since last update. This shouldn't be
676                  * becasue splitting causes lock revocation, so revalidate had
677                  * to fail and lookup on dir had to return mea.
678                  */
679                 CWARN("we haven't knew about directory splitting!\n");
680                 LASSERT(obj == NULL);
681
682                 obj = lmv_obj_create(exp, &rpid, NULL);
683                 if (IS_ERR(obj))
684                         RETURN((int)PTR_ERR(obj));
685                 lmv_obj_put(obj);
686                 goto repeat;
687         }
688
689         if (rc < 0)
690                 GOTO(out_free_sop_data, rc);
691
692         /*
693          * Okay, MDS has returned success. Probably name has been resolved in
694          * remote inode.
695          */
696         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
697                                cb_blocking, extra_lock_flags);
698
699         if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
700                 /* wow! this is split dir, we'd like to handle it */
701                 body = lustre_msg_buf((*reqp)->rq_repmsg,
702                                       DLM_REPLY_REC_OFF, sizeof(*body));
703                 LASSERT(body != NULL);
704                 LASSERT((body->valid & OBD_MD_FLID) != 0);
705
706                 obj = lmv_obj_grab(obd, &body->fid1);
707                 if (!obj) {
708                         obj = lmv_obj_create(exp, &body->fid1, mea);
709                         if (IS_ERR(obj))
710                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
711                 }
712                 lmv_obj_put(obj);
713         }
714
715         EXIT;
716 out_free_sop_data:
717         OBD_FREE_PTR(sop_data);
718         return rc;
719 }
720
721 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
722                     void *lmm, int lmmsize, struct lookup_intent *it,
723                     int flags, struct ptlrpc_request **reqp,
724                     ldlm_blocking_callback cb_blocking,
725                     int extra_lock_flags)
726 {
727         struct obd_device *obd = exp->exp_obd;
728         int rc;
729         ENTRY;
730
731         LASSERT(it != NULL);
732         LASSERT(fid_is_sane(&op_data->fid1));
733         
734         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
735                LL_IT2STR(it), op_data->namelen, op_data->name,
736                PFID(&op_data->fid1));
737
738         rc = lmv_check_connect(obd);
739         if (rc)
740                 RETURN(rc);
741
742         if (it->it_op & IT_LOOKUP)
743                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
744                                        flags, reqp, cb_blocking,
745                                        extra_lock_flags);
746         else if (it->it_op & IT_OPEN)
747                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
748                                      flags, reqp, cb_blocking,
749                                      extra_lock_flags);
750         else if (it->it_op & IT_GETATTR)
751                 rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
752                                         flags, reqp, cb_blocking,
753                                         extra_lock_flags);
754         else
755                 LBUG();
756         RETURN(rc);
757 }
758
759 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
760                           const struct lu_fid *mid, struct lookup_intent *oit,
761                           int master_valid, ldlm_blocking_callback cb_blocking,
762                           int extra_lock_flags)
763 {
764         struct obd_device *obd = exp->exp_obd;
765         struct ptlrpc_request *mreq = *reqp;
766         struct lmv_obd *lmv = &obd->u.lmv;
767         struct lustre_handle master_lockh;
768         struct obd_export *tgt_exp;
769         struct md_op_data *op_data;
770         struct ldlm_lock *lock;
771         unsigned long size = 0;
772         struct mdt_body *body;
773         struct lmv_obj *obj;
774         int master_lock_mode;
775         int i, rc = 0;
776         ENTRY;
777
778         OBD_ALLOC_PTR(op_data);
779         if (op_data == NULL)
780                 RETURN(-ENOMEM);
781         
782         /* we have to loop over the subobjects, check validity and update them
783          * from MDSs if needed. it's very useful that we need not to update all
784          * the fields. say, common fields (that are equal on all the subojects
785          * need not to be update, another fields (i_size, for example) are
786          * cached all the time */
787         obj = lmv_obj_grab(obd, mid);
788         LASSERT(obj != NULL);
789
790         master_lock_mode = 0;
791
792         lmv_obj_lock(obj);
793
794         for (i = 0; i < obj->lo_objcount; i++) {
795                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
796                 struct lustre_handle *lockh = NULL;
797                 struct ptlrpc_request *req = NULL;
798                 ldlm_blocking_callback cb;
799                 struct lookup_intent it;
800                 int master = 0;
801
802                 CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
803                        PFID(&fid));
804
805                 memset(op_data, 0, sizeof(*op_data));
806                 memset(&it, 0, sizeof(it));
807                 it.it_op = IT_GETATTR;
808
809                 cb = lmv_blocking_ast;
810
811                 if (lu_fid_eq(&fid, &obj->lo_fid)) {
812                         if (master_valid) {
813                                 /* lmv_intent_getattr() already checked
814                                  * validness and took the lock */
815                                 if (mreq) {
816                                         /* it even got the reply refresh attrs
817                                          * from that reply */
818                                         body = lustre_msg_buf(mreq->rq_repmsg,
819                                                               DLM_REPLY_REC_OFF, 
820                                                               sizeof(*body));
821                                         LASSERT(body != NULL);
822                                         goto update;
823                                 }
824                                 /* take already cached attrs into account */
825                                 CDEBUG(D_OTHER,
826                                        "master is locked and cached\n");
827                                 goto release_lock;
828                         }
829                         master = 1;
830                         cb = cb_blocking;
831                 }
832
833                 op_data->fid1 = fid;
834                 op_data->fid2 = fid;
835
836                 /* is obj valid? */
837                 tgt_exp = lmv_get_export(lmv, &fid);
838                 if (IS_ERR(tgt_exp))
839                         GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp));
840
841                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
842                                     extra_lock_flags);
843                 
844                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
845                 if (rc > 0 && req == NULL) {
846                         /* nice, this slave is valid */
847                         LASSERT(req == NULL);
848                         CDEBUG(D_OTHER, "cached\n");
849                         goto release_lock;
850                 }
851
852                 if (rc < 0) {
853                         /* error during revalidation */
854                         GOTO(cleanup, rc);
855                 }
856                 if (master) {
857                         LASSERT(master_valid == 0);
858                         /* save lock on master to be returned to the caller */
859                         CDEBUG(D_OTHER, "no lock on master yet\n");
860                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
861                         master_lock_mode = it.d.lustre.it_lock_mode;
862                         it.d.lustre.it_lock_mode = 0;
863                 } else {
864                         /* this is slave. we want to control it */
865                         lock = ldlm_handle2lock(lockh);
866                         LASSERT(lock);
867                         lock->l_ast_data = lmv_obj_get(obj);
868                         LDLM_LOCK_PUT(lock);
869                 }
870
871                 if (*reqp == NULL) {
872                         /* this is first reply, we'll use it to return updated
873                          * data back to the caller */
874                         LASSERT(req);
875                         ptlrpc_request_addref(req);
876                         *reqp = req;
877
878                 }
879
880                 body = lustre_msg_buf(req->rq_repmsg,
881                                       DLM_REPLY_REC_OFF, sizeof(*body));
882                 LASSERT(body);
883
884 update:
885                 obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) * 
886                         (i + 1);
887
888                 CDEBUG(D_OTHER, "fresh: %lu\n",
889                        (unsigned long)obj->lo_inodes[i].li_size);
890
891                 if (req)
892                         ptlrpc_req_finished(req);
893 release_lock:
894                 size += obj->lo_inodes[i].li_size;
895
896                 if (it.d.lustre.it_lock_mode)
897                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
898         }
899
900         if (*reqp) {
901                 /* some attrs got refreshed, we have reply and it's time to put
902                  * fresh attrs to it */
903                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
904                        (unsigned long)size);
905
906                 body = lustre_msg_buf((*reqp)->rq_repmsg, 
907                                       DLM_REPLY_REC_OFF, sizeof(*body));
908                 LASSERT(body);
909
910                 body->size = size;
911
912                 if (mreq == NULL) {
913                         /*
914                          * very important to maintain mds num the same because
915                          * of revalidation. mreq == NULL means that caller has
916                          * no reply and the only attr we can return is size.
917                          */
918                         body->valid = OBD_MD_FLSIZE;
919                         
920 #if 0
921                         rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds);
922                         if (rc)
923                                 GOTO(cleanup, rc);
924 #endif
925                 }
926                 if (master_valid == 0) {
927                         memcpy(&oit->d.lustre.it_lock_handle,
928                                &master_lockh, sizeof(master_lockh));
929                         oit->d.lustre.it_lock_mode = master_lock_mode;
930                 }
931                 rc = 0;
932         } else {
933                 /* it seems all the attrs are fresh and we did no request */
934                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
935                 if (master_valid == 0)
936                         oit->d.lustre.it_lock_mode = master_lock_mode;
937                 rc = 1;
938         }
939
940         EXIT;
941 cleanup:
942         lmv_obj_unlock(obj);
943         lmv_obj_put(obj);
944 out_free_op_data:
945         OBD_FREE_PTR(op_data);
946         return rc;
947 }