Whamcloud - gitweb
Branch b_new_cmd
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38
39 #include <lustre/lustre_idl.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_dlm.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46 #include "lmv_internal.h"
47
48 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
49 {
50         if (it->d.lustre.it_lock_mode != 0)
51                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
52                                  it->d.lustre.it_lock_mode);
53 }
54
55 int lmv_intent_remote(struct obd_export *exp, void *lmm,
56                       int lmmsize, struct lookup_intent *it,
57                       int flags, struct ptlrpc_request **reqp,
58                       ldlm_blocking_callback cb_blocking,
59                       int extra_lock_flags)
60 {
61         struct obd_device *obd = exp->exp_obd;
62         struct lmv_obd *lmv = &obd->u.lmv;
63         struct ptlrpc_request *req = NULL;
64         struct mdt_body *body = NULL;
65         struct lustre_handle plock;
66         struct md_op_data *op_data;
67         struct obd_export *tgt_exp;
68         int pmode, rc = 0;
69         ENTRY;
70
71         body = lustre_msg_buf((*reqp)->rq_repmsg,
72                               DLM_REPLY_REC_OFF, sizeof(*body));
73         LASSERT(body != NULL);
74
75         if (!(body->valid & OBD_MD_MDS))
76                 RETURN(0);
77
78         /*
79          * oh, MDS reports that this is remote inode case i.e. we have to ask
80          * for real attrs on another MDS.
81          */
82         if (it->it_op & IT_LOOKUP) {
83                 /*
84                  * unfortunately, we have to lie to MDC/MDS to retrieve
85                  * attributes llite needs.
86                  */
87                 it->it_op = IT_GETATTR;
88         }
89
90         /* we got LOOKUP lock, but we really need attrs */
91         pmode = it->d.lustre.it_lock_mode;
92         if (pmode) {
93                 memcpy(&plock, &it->d.lustre.it_lock_handle, sizeof(plock));
94                 it->d.lustre.it_lock_mode = 0;
95                 it->d.lustre.it_data = 0;
96         }
97
98         LASSERT(fid_is_sane(&body->fid1));
99
100         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
101
102         OBD_ALLOC_PTR(op_data);
103         if (op_data == NULL)
104                 GOTO(out, rc = -ENOMEM);
105         
106         op_data->fid1 = body->fid1;
107
108         tgt_exp = lmv_get_export(lmv, &body->fid1);
109         if (IS_ERR(tgt_exp))
110                 RETURN(PTR_ERR(tgt_exp));
111
112         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
113                             &req, cb_blocking, extra_lock_flags);
114
115         /*
116          * llite needs LOOKUP lock to track dentry revocation in order to
117          * maintain dcache consistency. Thus drop UPDATE lock here and put
118          * LOOKUP in request.
119          */
120         if (rc == 0) {
121                 lmv_drop_intent_lock(it);
122                 memcpy(&it->d.lustre.it_lock_handle, &plock, sizeof(plock));
123                 it->d.lustre.it_lock_mode = pmode;
124         }
125
126         OBD_FREE_PTR(op_data);
127         EXIT;
128 out:
129         if (rc && pmode)
130                 ldlm_lock_decref(&plock, pmode);
131
132         ptlrpc_req_finished(*reqp);
133         *reqp = req;
134         return rc;
135 }
136
137 int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
138                             struct md_op_data *op, struct lu_fid *fid)
139 {
140         struct lmv_obj *obj;
141         struct lmv_obd *lmv = &obd->u.lmv;
142         struct lu_fid *rpid;
143         mdsno_t mds;
144         int rc;
145         ENTRY;
146
147         obj = lmv_obj_grab(obd, pid);
148         if (!obj)
149                RETURN(0);
150         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
151                            (char *)op->name, op->namelen);
152         rpid = &obj->lo_inodes[mds].li_fid;
153         rc = lmv_fld_lookup(lmv, rpid, &mds);
154         if (rc)
155                 GOTO(cleanup, rc);
156         
157         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
158         if (rc < 0)
159                 GOTO(cleanup, rc);
160         if (rc > 0) {
161                 LASSERT(fid_is_sane(fid));
162                 rc = fld_client_create(&lmv->lmv_fld,
163                                        fid_seq(fid), mds, NULL);
164                 if (rc) {
165                         CERROR("can't create fld rc%d\n", rc);
166                         GOTO(cleanup, rc);
167                 }
168         }
169         CDEBUG(D_INFO, "Allocate new fid"DFID"for split obj\n",PFID(fid));
170 cleanup:
171         lmv_obj_put(obj);
172         RETURN(rc);
173 }
174
175 /*
176  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
177  * may be split dir.
178  */
179 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
180                     void *lmm, int lmmsize, struct lookup_intent *it,
181                     int flags, struct ptlrpc_request **reqp,
182                     ldlm_blocking_callback cb_blocking,
183                     int extra_lock_flags)
184 {
185         struct obd_device *obd = exp->exp_obd;
186         struct lu_fid rpid = op_data->fid1;
187         struct lmv_obd *lmv = &obd->u.lmv;
188         struct mdt_body *body = NULL;
189         struct md_op_data *sop_data;
190         struct lmv_stripe_md *mea;
191         struct lmv_obj *obj;
192         int rc, loop = 0;
193         mdsno_t mds;
194         ENTRY;
195
196         OBD_ALLOC_PTR(sop_data);
197         if (sop_data == NULL)
198                 RETURN(-ENOMEM);
199         
200         /* save op_data fro repeat case */
201         *sop_data = *op_data;
202         
203 repeat:
204         LASSERT(++loop <= 2);
205         rc = lmv_fld_lookup(lmv, &rpid, &mds);
206         if (rc)
207                 GOTO(out_free_sop_data, rc);
208         
209         obj = lmv_obj_grab(obd, &rpid);
210         if (obj) {
211                 /*
212                  * Directory is already split, so we have to forward request to
213                  * the right MDS.
214                  */
215                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
216                                    (char *)op_data->name, op_data->namelen);
217
218                 rpid = obj->lo_inodes[mds].li_fid;
219                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
220                 lmv_obj_put(obj);
221                 if (rc) 
222                         GOTO(out_free_sop_data, rc);
223                 
224                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
225                        mds, PFID(&rpid));
226         }
227
228         sop_data->fid1 = rpid;
229
230         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
231                             lmm, lmmsize, it, flags, reqp,
232                             cb_blocking, extra_lock_flags);
233         
234         if (rc == -ERESTART) {
235                 /*
236                  * Directory got split. Time to update local object and repeat
237                  * the request with proper MDS.
238                  */
239                 LASSERT(lu_fid_eq(&op_data->fid1, &rpid));
240                 rc = lmv_handle_split(exp, &rpid);
241                 if (rc == 0) {
242                         ptlrpc_req_finished(*reqp);
243                        /* We shoudld reallocate the FID for the object */
244                         rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
245                                                      &sop_data->fid2);
246                         if (rc)
247                                 GOTO(out_free_sop_data, rc);
248                         /* client switches to new sequence, setup fld */
249                         goto repeat;
250                 }
251         } else if (rc == -ESTALE && it->d.lustre.it_lock_mode) {
252                 struct lustre_handle *handle;
253                 /* cross-ref open can have lookup lock on child */
254                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
255                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
256         }
257
258         if (rc != 0)
259                 GOTO(out_free_sop_data, rc);
260
261         /*
262          * Okay, MDS has returned success. Probably name has been resolved in
263          * remote inode.
264          */
265         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
266                                cb_blocking, extra_lock_flags);
267         if (rc != 0) {
268                 LASSERT(rc < 0);
269
270                 /*
271                  * This is possible, that some userspace application will try to
272                  * open file as directory and we will have -ENOTDIR here. As
273                  * this is "usual" situation, we should not print error here,
274                  * only debug info.
275                  */
276                 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
277                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->fid2),
278                        PFID(&rpid), op_data->namelen, op_data->name, rc);
279                 GOTO(out_free_sop_data, rc);
280         }
281
282         /*
283          * nothing is found, do not access body->fid1 as it is zero and thus
284          * pointless.
285          */
286         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
287             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
288             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
289                 GOTO(out_free_sop_data, rc = 0);
290
291         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
292          * to update them for split dir */
293         body = lustre_msg_buf((*reqp)->rq_repmsg,
294                               DLM_REPLY_REC_OFF, sizeof(*body));
295         LASSERT(body != NULL);
296
297         /* could not find object, FID is not present in response. */
298         if (!(body->valid & OBD_MD_FLID))
299                 GOTO(out_free_sop_data, rc = 0);
300
301         obj = lmv_obj_grab(obd, &body->fid1);
302         if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
303                 /* wow! this is split dir, we'd like to handle it */
304                 obj = lmv_obj_create(exp, &body->fid1, mea);
305                 if (IS_ERR(obj))
306                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
307         }
308
309         if (obj) {
310                 /* this is split dir and we'd want to get attrs */
311                 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
312                        PFID(&body->fid1));
313
314                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
315                                            cb_blocking, extra_lock_flags);
316         } else if (S_ISDIR(body->mode)) {
317                 CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
318                        PFID(&body->fid1));
319         }
320
321         if (obj)
322                 lmv_obj_put(obj);
323
324         EXIT;
325 out_free_sop_data:
326         OBD_FREE_PTR(sop_data);
327         return rc;
328 }
329
330 int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
331                        void *lmm, int lmmsize, struct lookup_intent *it,
332                        int flags, struct ptlrpc_request **reqp,
333                        ldlm_blocking_callback cb_blocking,
334                        int extra_lock_flags)
335 {
336         struct lmv_obj *obj = NULL, *obj2 = NULL;
337         struct obd_device *obd = exp->exp_obd;
338         struct lu_fid rpid = op_data->fid1;
339         struct lmv_obd *lmv = &obd->u.lmv;
340         struct mdt_body *body = NULL;
341         struct md_op_data *sop_data;
342         struct lmv_stripe_md *mea;
343         mdsno_t mds;
344         int rc = 0;
345         ENTRY;
346
347         OBD_ALLOC_PTR(sop_data);
348         if (sop_data == NULL)
349                 RETURN(-ENOMEM);
350         
351         /* save op_data fro repeat case */
352         *sop_data = *op_data;
353         
354         if (fid_is_sane(&op_data->fid2)) {
355                 /*
356                  * Caller wants to revalidate attrs of obj we have to revalidate
357                  * slaves if requested object is split directory.
358                  */
359                 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
360                        PFID(&op_data->fid2));
361                 
362                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
363                 if (rc)
364                         GOTO(out_free_sop_data, rc);
365 #if 0
366                 /*
367                  * In fact, we do not need this with current intent_lock(), but
368                  * it may change some day.
369                  */
370                 obj = lmv_obj_grab(obd, &op_data->fid2);
371                 if (obj) {
372                         if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
373                                 rpid = obj->lo_inodes[mds].li_fid;
374                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
375                                 if (rc) {
376                                         lmv_obj_put(obj);
377                                         GOTO(out_free_sop_data, rc);
378                                 }
379                         }
380                         lmv_obj_put(obj);
381                 }
382 #endif
383         } else {
384                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
385                        op_data->namelen, op_data->name,
386                        PFID(&op_data->fid1));
387                 
388                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
389                 if (rc)
390                         GOTO(out_free_sop_data, rc);
391                 obj = lmv_obj_grab(obd, &op_data->fid1);
392                 if (obj && op_data->namelen) {
393                         /* directory is already split. calculate mds */
394                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
395                                            (char *)op_data->name,
396                                            op_data->namelen);
397                         
398                         rpid = obj->lo_inodes[mds].li_fid;
399                         rc = lmv_fld_lookup(lmv, &rpid, &mds);
400                         if (rc) {
401                                 lmv_obj_put(obj);
402                                 GOTO(out_free_sop_data, rc);
403                         }
404                         lmv_obj_put(obj);
405
406                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
407                                mds, PFID(&rpid));
408                 }
409         }
410
411         sop_data->fid1 = rpid;
412
413         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
414                             lmmsize, it, flags, reqp, cb_blocking,
415                             extra_lock_flags);
416         if (rc < 0)
417                 GOTO(out_free_sop_data, rc);
418
419         if (obj && rc > 0) {
420                 /*
421                  * This is split dir. In order to optimize things a bit, we
422                  * consider obj valid updating missing parts.
423
424                  * FIXME: do we need to return any lock here? It would be fine
425                  * if we don't. This means that nobody should use UPDATE lock to
426                  * notify about object * removal.
427                  */
428                 CDEBUG(D_OTHER,
429                        "revalidate slaves for "DFID", rc %d\n",
430                        PFID(&op_data->fid2), rc);
431
432                 LASSERT(fid_is_sane(&op_data->fid2));
433                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->fid2, it, rc,
434                                            cb_blocking, extra_lock_flags);
435                 GOTO(out_free_sop_data, rc);
436         }
437
438         if (*reqp == NULL)
439                 GOTO(out_free_sop_data, rc);
440
441         /*
442          * okay, MDS has returned success. Probably name has been resolved in
443          * remote inode.
444          */
445         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
446                                reqp, cb_blocking, extra_lock_flags);
447         if (rc < 0)
448                 GOTO(out_free_sop_data, rc);
449
450         /*
451          * Nothing is found, do not access body->fid1 as it is zero and thus
452          * pointless.
453          */
454         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
455                 GOTO(out_free_sop_data, rc = 0);
456
457         LASSERT(*reqp);
458         LASSERT((*reqp)->rq_repmsg);
459         body = lustre_msg_buf((*reqp)->rq_repmsg,
460                               DLM_REPLY_REC_OFF, sizeof(*body));
461         LASSERT(body != NULL);
462
463         /* could not find object, FID is not present in response. */
464         if (!(body->valid & OBD_MD_FLID))
465                 GOTO(out_free_sop_data, rc = 0);
466
467         obj2 = lmv_obj_grab(obd, &body->fid1);
468
469         if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
470                 /* wow! this is split dir, we'd like to handle it. */
471                 obj2 = lmv_obj_create(exp, &body->fid1, mea);
472                 if (IS_ERR(obj2))
473                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
474         }
475
476         if (obj2) {
477                 /* this is split dir and we'd want to get attrs */
478                 CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
479                        PFID(&body->fid1), rc);
480
481                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
482                                            cb_blocking, extra_lock_flags);
483                 lmv_obj_put(obj2);
484         }
485
486         EXIT;
487 out_free_sop_data:
488         OBD_FREE_PTR(sop_data);
489         return rc;
490 }
491
492 void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
493 {
494         /* update size */
495         body->size += lino->li_size;
496 }
497
498 /* this is not used currently */
499 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
500 {
501         struct obd_device *obd = exp->exp_obd;
502         struct lmv_obd *lmv = &obd->u.lmv;
503         struct mdt_body *body = NULL;
504         struct lustre_handle *lockh;
505         struct md_op_data *op_data;
506         struct ldlm_lock *lock;
507         struct mdt_body *body2;
508         struct lmv_obj *obj;
509         int i, rc = 0;
510         ENTRY;
511
512         LASSERT(reqp);
513         LASSERT(*reqp);
514
515         /* master is locked. we'd like to take locks on slaves and update
516          * attributes to be returned from the slaves it's important that lookup
517          * is called in two cases:
518
519          *  - for first time (dcache has no such a resolving yet).  -
520          *  ->d_revalidate() returned false.
521
522          * last case possible only if all the objs (master and all slaves aren't
523          * valid */
524
525         body = lustre_msg_buf((*reqp)->rq_repmsg,
526                               DLM_REPLY_REC_OFF, sizeof(*body));
527         LASSERT(body != NULL);
528         LASSERT((body->valid & OBD_MD_FLID) != 0);
529
530         obj = lmv_obj_grab(obd, &body->fid1);
531         LASSERT(obj != NULL);
532
533         CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
534                PFID(&body->fid1));
535
536         OBD_ALLOC_PTR(op_data);
537         if (op_data == NULL)
538                 RETURN(-ENOMEM);
539         
540         lmv_obj_lock(obj);
541
542         for (i = 0; i < obj->lo_objcount; i++) {
543                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
544                 struct ptlrpc_request *req = NULL;
545                 struct obd_export *tgt_exp;
546                 struct lookup_intent it;
547
548                 if (lu_fid_eq(&fid, &obj->lo_fid))
549                         /* skip master obj */
550                         continue;
551
552                 CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
553
554                 /* is obj valid? */
555                 memset(&it, 0, sizeof(it));
556                 it.it_op = IT_GETATTR;
557
558                 memset(op_data, 0, sizeof(*op_data));
559                 op_data->fid1 = fid;
560                 op_data->fid2 = fid;
561
562                 tgt_exp = lmv_get_export(lmv, &fid);
563                 if (IS_ERR(tgt_exp))
564                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
565
566                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req,
567                                     lmv_blocking_ast, 0);
568
569                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
570                 if (rc > 0 && req == NULL) {
571                         /* nice, this slave is valid */
572                         LASSERT(req == NULL);
573                         CDEBUG(D_OTHER, "cached\n");
574                         goto release_lock;
575                 }
576
577                 if (rc < 0) {
578                         /* error during lookup */
579                         GOTO(cleanup, rc);
580                 }
581                 lock = ldlm_handle2lock(lockh);
582                 LASSERT(lock);
583
584                 lock->l_ast_data = lmv_obj_get(obj);
585
586                 body2 = lustre_msg_buf(req->rq_repmsg,
587                                        DLM_REPLY_REC_OFF, sizeof(*body2));
588                 LASSERT(body2);
589
590                 obj->lo_inodes[i].li_size = body2->size;
591
592                 CDEBUG(D_OTHER, "fresh: %lu\n",
593                        (unsigned long)obj->lo_inodes[i].li_size);
594
595                 LDLM_LOCK_PUT(lock);
596
597                 if (req)
598                         ptlrpc_req_finished(req);
599 release_lock:
600                 lmv_update_body(body, obj->lo_inodes + i);
601
602                 if (it.d.lustre.it_lock_mode)
603                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
604         }
605
606         EXIT;
607 cleanup:
608         OBD_FREE_PTR(op_data);
609         lmv_obj_unlock(obj);
610         lmv_obj_put(obj);
611         return rc;
612 }
613
614 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
615                       void *lmm, int lmmsize, struct lookup_intent *it,
616                       int flags, struct ptlrpc_request **reqp,
617                       ldlm_blocking_callback cb_blocking,
618                       int extra_lock_flags)
619 {
620         struct obd_device *obd = exp->exp_obd;
621         struct lu_fid rpid = op_data->fid1;
622         struct lmv_obd *lmv = &obd->u.lmv;
623         struct mdt_body *body = NULL;
624         struct md_op_data *sop_data;
625         struct lmv_stripe_md *mea;
626         struct lmv_obj *obj;
627         int rc, loop = 0;
628         mdsno_t mds;
629         ENTRY;
630
631         OBD_ALLOC_PTR(sop_data);
632         if (sop_data == NULL)
633                 RETURN(-ENOMEM);
634         
635         /* save op_data fro repeat case */
636         *sop_data = *op_data;
637         
638         /*
639          * IT_LOOKUP is intended to produce name -> fid resolving (let's call
640          * this lookup below) or to confirm requested resolving is still valid
641          * (let's call this revalidation) fid_is_sane(&sop_data->fid2) specifies
642          * revalidation.
643          */
644         if (fid_is_sane(&op_data->fid2)) {
645                 /*
646                  * This is revalidate: we have to check is LOOKUP lock still
647                  * valid for given fid. Very important part is that we have to
648                  * choose right mds because namespace is per mds.
649                  */
650                 rpid = op_data->fid1;
651                 obj = lmv_obj_grab(obd, &rpid);
652                 if (obj) {
653                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
654                                            (char *)op_data->name,
655                                            op_data->namelen);
656                         rpid = obj->lo_inodes[mds].li_fid;
657                         lmv_obj_put(obj);
658                 }
659                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
660                 if (rc)
661                         GOTO(out_free_sop_data, rc);
662
663                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
664                        PFID(&op_data->fid2), mds);
665         } else {
666                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
667                 if (rc)
668                         GOTO(out_free_sop_data, rc);
669 repeat:
670                 LASSERT(++loop <= 2);
671
672                 /*
673                  * This is lookup. During lookup we have to update all the
674                  * attributes, because returned values will be put in struct
675                  * inode.
676                  */
677                 obj = lmv_obj_grab(obd, &op_data->fid1);
678                 if (obj) {
679                         if (op_data->namelen) {
680                                 /* directory is already split. calculate mds */
681                                 mds = raw_name2idx(obj->lo_hashtype,
682                                                    obj->lo_objcount,
683                                                    (char *)op_data->name,
684                                                    op_data->namelen);
685                                 rpid = obj->lo_inodes[mds].li_fid;
686                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
687                                 if (rc) {
688                                         lmv_obj_put(obj);
689                                         GOTO(out_free_sop_data, rc);
690                                 }
691                         }
692                         lmv_obj_put(obj);
693                 }
694                 fid_zero(&op_data->fid2);
695         }
696
697         sop_data->fid1 = rpid;
698
699         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
700                             it, flags, reqp, cb_blocking, extra_lock_flags);
701         if (rc > 0) {
702                 LASSERT(fid_is_sane(&op_data->fid2));
703                 GOTO(out_free_sop_data, rc);
704         }
705         if (rc > 0) {
706                 /*
707                  * Very interesting. it seems object is still valid but for some
708                  * reason llite calls lookup, not revalidate.
709                  */
710                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
711                       PFID(&rpid));
712                 LASSERT(*reqp == NULL);
713                 GOTO(out_free_sop_data, rc);
714         }
715
716         if (rc == 0 && *reqp == NULL) {
717                 /* once again, we're asked for lookup, not revalidate */
718                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
719                       PFID(&rpid));
720                 GOTO(out_free_sop_data, rc);
721         }
722
723         if (rc == -ERESTART) {
724                 /*
725                  * Directory got split since last update. This shouldn't be
726                  * becasue splitting causes lock revocation, so revalidate had
727                  * to fail and lookup on dir had to return mea.
728                  */
729                 CWARN("we haven't knew about directory splitting!\n");
730                 LASSERT(obj == NULL);
731
732                 obj = lmv_obj_create(exp, &rpid, NULL);
733                 if (IS_ERR(obj))
734                         RETURN((int)PTR_ERR(obj));
735                 lmv_obj_put(obj);
736                 goto repeat;
737         }
738
739         if (rc < 0)
740                 GOTO(out_free_sop_data, rc);
741
742         /*
743          * Okay, MDS has returned success. Probably name has been resolved in
744          * remote inode.
745          */
746         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
747                                cb_blocking, extra_lock_flags);
748
749         if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
750                 /* wow! this is split dir, we'd like to handle it */
751                 body = lustre_msg_buf((*reqp)->rq_repmsg,
752                                       DLM_REPLY_REC_OFF, sizeof(*body));
753                 LASSERT(body != NULL);
754                 LASSERT((body->valid & OBD_MD_FLID) != 0);
755
756                 obj = lmv_obj_grab(obd, &body->fid1);
757                 if (!obj) {
758                         obj = lmv_obj_create(exp, &body->fid1, mea);
759                         if (IS_ERR(obj))
760                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
761                 }
762                 lmv_obj_put(obj);
763         }
764
765         EXIT;
766 out_free_sop_data:
767         OBD_FREE_PTR(sop_data);
768         return rc;
769 }
770
771 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
772                     void *lmm, int lmmsize, struct lookup_intent *it,
773                     int flags, struct ptlrpc_request **reqp,
774                     ldlm_blocking_callback cb_blocking,
775                     int extra_lock_flags)
776 {
777         struct obd_device *obd = exp->exp_obd;
778         int rc;
779         ENTRY;
780
781         LASSERT(it != NULL);
782         LASSERT(fid_is_sane(&op_data->fid1));
783         
784         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
785                LL_IT2STR(it), op_data->namelen, op_data->name,
786                PFID(&op_data->fid1));
787
788         rc = lmv_check_connect(obd);
789         if (rc)
790                 RETURN(rc);
791
792         if (it->it_op & IT_LOOKUP)
793                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
794                                        flags, reqp, cb_blocking,
795                                        extra_lock_flags);
796         else if (it->it_op & IT_OPEN)
797                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
798                                      flags, reqp, cb_blocking,
799                                      extra_lock_flags);
800         else if (it->it_op & IT_GETATTR)
801                 rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
802                                         flags, reqp, cb_blocking,
803                                         extra_lock_flags);
804         else
805                 LBUG();
806         RETURN(rc);
807 }
808
809 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
810                           const struct lu_fid *mid, struct lookup_intent *oit,
811                           int master_valid, ldlm_blocking_callback cb_blocking,
812                           int extra_lock_flags)
813 {
814         struct obd_device *obd = exp->exp_obd;
815         struct ptlrpc_request *mreq = *reqp;
816         struct lmv_obd *lmv = &obd->u.lmv;
817         struct lustre_handle master_lockh;
818         struct obd_export *tgt_exp;
819         struct md_op_data *op_data;
820         struct ldlm_lock *lock;
821         unsigned long size = 0;
822         struct mdt_body *body;
823         struct lmv_obj *obj;
824         int master_lock_mode;
825         int i, rc = 0;
826         ENTRY;
827
828         OBD_ALLOC_PTR(op_data);
829         if (op_data == NULL)
830                 RETURN(-ENOMEM);
831         
832         /* we have to loop over the subobjects, check validity and update them
833          * from MDSs if needed. it's very useful that we need not to update all
834          * the fields. say, common fields (that are equal on all the subojects
835          * need not to be update, another fields (i_size, for example) are
836          * cached all the time */
837         obj = lmv_obj_grab(obd, mid);
838         LASSERT(obj != NULL);
839
840         master_lock_mode = 0;
841
842         lmv_obj_lock(obj);
843
844         for (i = 0; i < obj->lo_objcount; i++) {
845                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
846                 struct lustre_handle *lockh = NULL;
847                 struct ptlrpc_request *req = NULL;
848                 ldlm_blocking_callback cb;
849                 struct lookup_intent it;
850                 int master = 0;
851
852                 CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
853                        PFID(&fid));
854
855                 memset(op_data, 0, sizeof(*op_data));
856                 memset(&it, 0, sizeof(it));
857                 it.it_op = IT_GETATTR;
858
859                 cb = lmv_blocking_ast;
860
861                 if (lu_fid_eq(&fid, &obj->lo_fid)) {
862                         if (master_valid) {
863                                 /* lmv_intent_getattr() already checked
864                                  * validness and took the lock */
865                                 if (mreq) {
866                                         /* it even got the reply refresh attrs
867                                          * from that reply */
868                                         body = lustre_msg_buf(mreq->rq_repmsg,
869                                                               DLM_REPLY_REC_OFF, 
870                                                               sizeof(*body));
871                                         LASSERT(body != NULL);
872                                         goto update;
873                                 }
874                                 /* take already cached attrs into account */
875                                 CDEBUG(D_OTHER,
876                                        "master is locked and cached\n");
877                                 goto release_lock;
878                         }
879                         master = 1;
880                         cb = cb_blocking;
881                 }
882
883                 op_data->fid1 = fid;
884                 op_data->fid2 = fid;
885
886                 /* is obj valid? */
887                 tgt_exp = lmv_get_export(lmv, &fid);
888                 if (IS_ERR(tgt_exp))
889                         GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp));
890
891                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
892                                     extra_lock_flags);
893                 
894                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
895                 if (rc > 0 && req == NULL) {
896                         /* nice, this slave is valid */
897                         LASSERT(req == NULL);
898                         CDEBUG(D_OTHER, "cached\n");
899                         goto release_lock;
900                 }
901
902                 if (rc < 0) {
903                         /* error during revalidation */
904                         GOTO(cleanup, rc);
905                 }
906                 if (master) {
907                         LASSERT(master_valid == 0);
908                         /* save lock on master to be returned to the caller */
909                         CDEBUG(D_OTHER, "no lock on master yet\n");
910                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
911                         master_lock_mode = it.d.lustre.it_lock_mode;
912                         it.d.lustre.it_lock_mode = 0;
913                 } else {
914                         /* this is slave. we want to control it */
915                         lock = ldlm_handle2lock(lockh);
916                         LASSERT(lock);
917                         lock->l_ast_data = lmv_obj_get(obj);
918                         LDLM_LOCK_PUT(lock);
919                 }
920
921                 if (*reqp == NULL) {
922                         /* this is first reply, we'll use it to return updated
923                          * data back to the caller */
924                         LASSERT(req);
925                         ptlrpc_request_addref(req);
926                         *reqp = req;
927
928                 }
929
930                 body = lustre_msg_buf(req->rq_repmsg,
931                                       DLM_REPLY_REC_OFF, sizeof(*body));
932                 LASSERT(body);
933
934 update:
935                 obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) * 
936                         (i + 1);
937
938                 CDEBUG(D_OTHER, "fresh: %lu\n",
939                        (unsigned long)obj->lo_inodes[i].li_size);
940
941                 if (req)
942                         ptlrpc_req_finished(req);
943 release_lock:
944                 size += obj->lo_inodes[i].li_size;
945
946                 if (it.d.lustre.it_lock_mode)
947                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
948         }
949
950         if (*reqp) {
951                 /* some attrs got refreshed, we have reply and it's time to put
952                  * fresh attrs to it */
953                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
954                        (unsigned long)size);
955
956                 body = lustre_msg_buf((*reqp)->rq_repmsg, 
957                                       DLM_REPLY_REC_OFF, sizeof(*body));
958                 LASSERT(body);
959
960                 body->size = size;
961
962                 if (mreq == NULL) {
963                         /*
964                          * very important to maintain mds num the same because
965                          * of revalidation. mreq == NULL means that caller has
966                          * no reply and the only attr we can return is size.
967                          */
968                         body->valid = OBD_MD_FLSIZE;
969                         
970 #if 0
971                         rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds);
972                         if (rc)
973                                 GOTO(cleanup, rc);
974 #endif
975                 }
976                 if (master_valid == 0) {
977                         memcpy(&oit->d.lustre.it_lock_handle,
978                                &master_lockh, sizeof(master_lockh));
979                         oit->d.lustre.it_lock_mode = master_lock_mode;
980                 }
981                 rc = 0;
982         } else {
983                 /* it seems all the attrs are fresh and we did no request */
984                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
985                 if (master_valid == 0)
986                         oit->d.lustre.it_lock_mode = master_lock_mode;
987                 rc = 1;
988         }
989
990         EXIT;
991 cleanup:
992         lmv_obj_unlock(obj);
993         lmv_obj_put(obj);
994 out_free_op_data:
995         OBD_FREE_PTR(op_data);
996         return rc;
997 }