Whamcloud - gitweb
LU-2675 obd: decruft md_enqueue() and md_intent_lock()
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_open_error(int phase, struct lookup_intent *it)
62 {
63         if (it_disposition(it, DISP_OPEN_LEASE)) {
64                 if (phase >= DISP_OPEN_LEASE)
65                         return it->d.lustre.it_status;
66                 else
67                         return 0;
68         }
69         if (it_disposition(it, DISP_OPEN_OPEN)) {
70                 if (phase >= DISP_OPEN_OPEN)
71                         return it->d.lustre.it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_OPEN_CREATE)) {
77                 if (phase >= DISP_OPEN_CREATE)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84                 if (phase >= DISP_LOOKUP_EXECD)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_IT_EXECD)) {
91                 if (phase >= DISP_IT_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97                it->d.lustre.it_status);
98         LBUG();
99         return 0;
100 }
101 EXPORT_SYMBOL(it_open_error);
102
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
105                       __u64 *bits)
106 {
107         struct ldlm_lock *lock;
108         struct inode *new_inode = data;
109         ENTRY;
110
111         if(bits)
112                 *bits = 0;
113
114         if (!*lockh)
115                 RETURN(0);
116
117         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
118
119         LASSERT(lock != NULL);
120         lock_res_and_lock(lock);
121 #ifdef __KERNEL__
122         if (lock->l_resource->lr_lvb_inode &&
123             lock->l_resource->lr_lvb_inode != data) {
124                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125                 LASSERTF(old_inode->i_state & I_FREEING,
126                          "Found existing inode %p/%lu/%u state %lu in lock: "
127                          "setting data to %p/%lu/%u\n", old_inode,
128                          old_inode->i_ino, old_inode->i_generation,
129                          old_inode->i_state,
130                          new_inode, new_inode->i_ino, new_inode->i_generation);
131         }
132 #endif
133         lock->l_resource->lr_lvb_inode = new_inode;
134         if (bits)
135                 *bits = lock->l_policy_data.l_inodebits.bits;
136
137         unlock_res_and_lock(lock);
138         LDLM_LOCK_PUT(lock);
139
140         RETURN(0);
141 }
142
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144                            const struct lu_fid *fid, ldlm_type_t type,
145                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
146                            struct lustre_handle *lockh)
147 {
148         struct ldlm_res_id res_id;
149         ldlm_mode_t rc;
150         ENTRY;
151
152         fid_build_reg_res_name(fid, &res_id);
153         /* LU-4405: Clear bits not supported by server */
154         policy->l_inodebits.bits &= exp_connect_ibits(exp);
155         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156                              &res_id, type, policy, mode, lockh, 0);
157         RETURN(rc);
158 }
159
160 int mdc_cancel_unused(struct obd_export *exp,
161                       const struct lu_fid *fid,
162                       ldlm_policy_data_t *policy,
163                       ldlm_mode_t mode,
164                       ldlm_cancel_flags_t flags,
165                       void *opaque)
166 {
167         struct ldlm_res_id res_id;
168         struct obd_device *obd = class_exp2obd(exp);
169         int rc;
170
171         ENTRY;
172
173         fid_build_reg_res_name(fid, &res_id);
174         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175                                              policy, mode, flags, opaque);
176         RETURN(rc);
177 }
178
179 int mdc_null_inode(struct obd_export *exp,
180                    const struct lu_fid *fid)
181 {
182         struct ldlm_res_id res_id;
183         struct ldlm_resource *res;
184         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
185         ENTRY;
186
187         LASSERTF(ns != NULL, "no namespace passed\n");
188
189         fid_build_reg_res_name(fid, &res_id);
190
191         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
192         if(res == NULL)
193                 RETURN(0);
194
195         lock_res(res);
196         res->lr_lvb_inode = NULL;
197         unlock_res(res);
198
199         ldlm_resource_putref(res);
200         RETURN(0);
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *
267 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
268                      struct md_op_data *op_data)
269 {
270         struct ptlrpc_request   *req;
271         struct obd_device       *obddev = class_exp2obd(exp);
272         struct ldlm_intent      *lit;
273         const void              *lmm = op_data->op_data;
274         int                      lmmsize = op_data->op_data_size;
275         struct list_head         cancels;
276         int count = 0;
277         int mode;
278         int rc;
279         ENTRY;
280
281         INIT_LIST_HEAD(&cancels);
282
283         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284
285         /* XXX: openlock is not cancelled for cross-refs. */
286         /* If inode is known, cancel conflicting OPEN locks. */
287         if (fid_is_sane(&op_data->op_fid2)) {
288                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
289                         if (it->it_flags & FMODE_WRITE)
290                                 mode = LCK_EX;
291                         else
292                                 mode = LCK_PR;
293                 } else {
294                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295                                 mode = LCK_CW;
296 #ifdef FMODE_EXEC
297                         else if (it->it_flags & FMODE_EXEC)
298                                 mode = LCK_PR;
299 #endif
300                         else
301                                 mode = LCK_CR;
302                 }
303                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
304                                                 &cancels, mode,
305                                                 MDS_INODELOCK_OPEN);
306         }
307
308         /* If CREATE, cancel parent's UPDATE lock. */
309         if (it->it_op & IT_CREAT)
310                 mode = LCK_EX;
311         else
312                 mode = LCK_CR;
313         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
314                                          &cancels, mode,
315                                          MDS_INODELOCK_UPDATE);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
318                                    &RQF_LDLM_INTENT_OPEN);
319         if (req == NULL) {
320                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
321                 RETURN(ERR_PTR(-ENOMEM));
322         }
323
324         /* parent capability */
325         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
326         /* child capability, reserve the size according to parent capa, it will
327          * be filled after we get the reply */
328         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
329
330         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
331                              op_data->op_namelen + 1);
332         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
333                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
334
335         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
336         if (rc < 0) {
337                 ptlrpc_request_free(req);
338                 RETURN(ERR_PTR(rc));
339         }
340
341         spin_lock(&req->rq_lock);
342         req->rq_replay = req->rq_import->imp_replayable;
343         spin_unlock(&req->rq_lock);
344
345         /* pack the intent */
346         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
347         lit->opc = (__u64)it->it_op;
348
349         /* pack the intended request */
350         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
351                       lmmsize);
352
353         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
354                              obddev->u.cli.cl_max_mds_easize);
355
356         /* for remote client, fetch remote perm for current user */
357         if (client_is_remote(exp))
358                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359                                      sizeof(struct mdt_remote_perm));
360         ptlrpc_request_set_replen(req);
361         return req;
362 }
363
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366                          struct lookup_intent *it,
367                          struct md_op_data *op_data)
368 {
369         struct ptlrpc_request   *req;
370         struct ldlm_intent      *lit;
371         int                     rc, count = 0, maxdata;
372         CFS_LIST_HEAD(cancels);
373
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377                                         &RQF_LDLM_INTENT_GETXATTR);
378         if (req == NULL)
379                 RETURN(ERR_PTR(-ENOMEM));
380
381         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
382
383         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(ERR_PTR(rc));
387         }
388
389         /* pack the intent */
390         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391         lit->opc = IT_GETXATTR;
392
393         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
394
395         /* pack the intended request */
396         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397                         op_data->op_valid, maxdata, -1, 0);
398
399         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400                                 RCL_SERVER, maxdata);
401
402         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403                                 RCL_SERVER, maxdata);
404
405         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406                                 RCL_SERVER, maxdata);
407
408         ptlrpc_request_set_replen(req);
409
410         RETURN(req);
411 }
412
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414                                                      struct lookup_intent *it,
415                                                      struct md_op_data *op_data)
416 {
417         struct ptlrpc_request *req;
418         struct obd_device     *obddev = class_exp2obd(exp);
419         struct ldlm_intent    *lit;
420         int                    rc;
421         ENTRY;
422
423         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
424                                    &RQF_LDLM_INTENT_UNLINK);
425         if (req == NULL)
426                 RETURN(ERR_PTR(-ENOMEM));
427
428         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
429         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
430                              op_data->op_namelen + 1);
431
432         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
433         if (rc) {
434                 ptlrpc_request_free(req);
435                 RETURN(ERR_PTR(rc));
436         }
437
438         /* pack the intent */
439         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
440         lit->opc = (__u64)it->it_op;
441
442         /* pack the intended request */
443         mdc_unlink_pack(req, op_data);
444
445         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
446                              obddev->u.cli.cl_default_mds_easize);
447         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
448                              obddev->u.cli.cl_default_mds_cookiesize);
449         ptlrpc_request_set_replen(req);
450         RETURN(req);
451 }
452
453 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
454                                                       struct lookup_intent *it,
455                                                       struct md_op_data *op_data)
456 {
457         struct ptlrpc_request *req;
458         struct obd_device     *obddev = class_exp2obd(exp);
459         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
460                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
461                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
462                                        (client_is_remote(exp) ?
463                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
464         struct ldlm_intent    *lit;
465         int                    rc;
466         int                     easize;
467         ENTRY;
468
469         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
470                                    &RQF_LDLM_INTENT_GETATTR);
471         if (req == NULL)
472                 RETURN(ERR_PTR(-ENOMEM));
473
474         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
475         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
476                              op_data->op_namelen + 1);
477
478         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
479         if (rc) {
480                 ptlrpc_request_free(req);
481                 RETURN(ERR_PTR(rc));
482         }
483
484         /* pack the intent */
485         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
486         lit->opc = (__u64)it->it_op;
487
488         if (obddev->u.cli.cl_default_mds_easize > 0)
489                 easize = obddev->u.cli.cl_default_mds_easize;
490         else
491                 easize = obddev->u.cli.cl_max_mds_easize;
492
493         /* pack the intended request */
494         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
495
496         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
497         if (client_is_remote(exp))
498                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
499                                      sizeof(struct mdt_remote_perm));
500         ptlrpc_request_set_replen(req);
501         RETURN(req);
502 }
503
504 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
505                                                      struct lookup_intent *it,
506                                                      struct md_op_data *unused)
507 {
508         struct obd_device     *obd = class_exp2obd(exp);
509         struct ptlrpc_request *req;
510         struct ldlm_intent    *lit;
511         struct layout_intent  *layout;
512         int rc;
513         ENTRY;
514
515         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
516                                 &RQF_LDLM_INTENT_LAYOUT);
517         if (req == NULL)
518                 RETURN(ERR_PTR(-ENOMEM));
519
520         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
521         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(ERR_PTR(rc));
525         }
526
527         /* pack the intent */
528         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
529         lit->opc = (__u64)it->it_op;
530
531         /* pack the layout intent request */
532         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
533         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
534          * set for replication */
535         layout->li_opc = LAYOUT_INTENT_ACCESS;
536
537         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
538                              obd->u.cli.cl_default_mds_easize);
539         ptlrpc_request_set_replen(req);
540         RETURN(req);
541 }
542
543 static struct ptlrpc_request *
544 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
545 {
546         struct ptlrpc_request *req;
547         int rc;
548         ENTRY;
549
550         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
551         if (req == NULL)
552                 RETURN(ERR_PTR(-ENOMEM));
553
554         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
555         if (rc) {
556                 ptlrpc_request_free(req);
557                 RETURN(ERR_PTR(rc));
558         }
559
560         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
561         ptlrpc_request_set_replen(req);
562         RETURN(req);
563 }
564
565 static int mdc_finish_enqueue(struct obd_export *exp,
566                               struct ptlrpc_request *req,
567                               struct ldlm_enqueue_info *einfo,
568                               struct lookup_intent *it,
569                               struct lustre_handle *lockh,
570                               int rc)
571 {
572         struct req_capsule  *pill = &req->rq_pill;
573         struct ldlm_request *lockreq;
574         struct ldlm_reply   *lockrep;
575         struct lustre_intent_data *intent = &it->d.lustre;
576         struct ldlm_lock    *lock;
577         void                *lvb_data = NULL;
578         int                  lvb_len = 0;
579         ENTRY;
580
581         LASSERT(rc >= 0);
582         /* Similarly, if we're going to replay this request, we don't want to
583          * actually get a lock, just perform the intent. */
584         if (req->rq_transno || req->rq_replay) {
585                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
586                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
587         }
588
589         if (rc == ELDLM_LOCK_ABORTED) {
590                 einfo->ei_mode = 0;
591                 memset(lockh, 0, sizeof(*lockh));
592                 rc = 0;
593         } else { /* rc = 0 */
594                 lock = ldlm_handle2lock(lockh);
595                 LASSERT(lock != NULL);
596
597                 /* If the server gave us back a different lock mode, we should
598                  * fix up our variables. */
599                 if (lock->l_req_mode != einfo->ei_mode) {
600                         ldlm_lock_addref(lockh, lock->l_req_mode);
601                         ldlm_lock_decref(lockh, einfo->ei_mode);
602                         einfo->ei_mode = lock->l_req_mode;
603                 }
604                 LDLM_LOCK_PUT(lock);
605         }
606
607         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
608         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
609
610         intent->it_disposition = (int)lockrep->lock_policy_res1;
611         intent->it_status = (int)lockrep->lock_policy_res2;
612         intent->it_lock_mode = einfo->ei_mode;
613         intent->it_lock_handle = lockh->cookie;
614         intent->it_data = req;
615
616         /* Technically speaking rq_transno must already be zero if
617          * it_status is in error, so the check is a bit redundant */
618         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
619                 mdc_clear_replay_flag(req, intent->it_status);
620
621         /* If we're doing an IT_OPEN which did not result in an actual
622          * successful open, then we need to remove the bit which saves
623          * this request for unconditional replay.
624          *
625          * It's important that we do this first!  Otherwise we might exit the
626          * function without doing so, and try to replay a failed create
627          * (bug 3440) */
628         if (it->it_op & IT_OPEN && req->rq_replay &&
629             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
630                 mdc_clear_replay_flag(req, intent->it_status);
631
632         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
633                   it->it_op, intent->it_disposition, intent->it_status);
634
635         /* We know what to expect, so we do any byte flipping required here */
636         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
637                 struct mdt_body *body;
638
639                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
640                 if (body == NULL) {
641                         CERROR ("Can't swab mdt_body\n");
642                         RETURN (-EPROTO);
643                 }
644
645                 if (it_disposition(it, DISP_OPEN_OPEN) &&
646                     !it_open_error(DISP_OPEN_OPEN, it)) {
647                         /*
648                          * If this is a successful OPEN request, we need to set
649                          * replay handler and data early, so that if replay
650                          * happens immediately after swabbing below, new reply
651                          * is swabbed by that handler correctly.
652                          */
653                         mdc_set_open_replay_data(NULL, NULL, it);
654                 }
655
656                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
657                         void *eadata;
658
659                         mdc_update_max_ea_from_body(exp, body);
660
661                         /*
662                          * The eadata is opaque; just check that it is there.
663                          * Eventually, obd_unpackmd() will check the contents.
664                          */
665                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
666                                                               body->eadatasize);
667                         if (eadata == NULL)
668                                 RETURN(-EPROTO);
669
670                         /* save lvb data and length in case this is for layout
671                          * lock */
672                         lvb_data = eadata;
673                         lvb_len = body->eadatasize;
674
675                         /*
676                          * We save the reply LOV EA in case we have to replay a
677                          * create for recovery.  If we didn't allocate a large
678                          * enough request buffer above we need to reallocate it
679                          * here to hold the actual LOV EA.
680                          *
681                          * To not save LOV EA if request is not going to replay
682                          * (for example error one).
683                          */
684                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
685                                 void *lmm;
686                                 if (req_capsule_get_size(pill, &RMF_EADATA,
687                                                          RCL_CLIENT) <
688                                     body->eadatasize)
689                                         mdc_realloc_openmsg(req, body);
690                                 else
691                                         req_capsule_shrink(pill, &RMF_EADATA,
692                                                            body->eadatasize,
693                                                            RCL_CLIENT);
694
695                                 req_capsule_set_size(pill, &RMF_EADATA,
696                                                      RCL_CLIENT,
697                                                      body->eadatasize);
698
699                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
700                                 if (lmm)
701                                         memcpy(lmm, eadata, body->eadatasize);
702                         }
703                 }
704
705                 if (body->valid & OBD_MD_FLRMTPERM) {
706                         struct mdt_remote_perm *perm;
707
708                         LASSERT(client_is_remote(exp));
709                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
710                                                 lustre_swab_mdt_remote_perm);
711                         if (perm == NULL)
712                                 RETURN(-EPROTO);
713                 }
714                 if (body->valid & OBD_MD_FLMDSCAPA) {
715                         struct lustre_capa *capa, *p;
716
717                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
718                         if (capa == NULL)
719                                 RETURN(-EPROTO);
720
721                         if (it->it_op & IT_OPEN) {
722                                 /* client fid capa will be checked in replay */
723                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
724                                 LASSERT(p);
725                                 *p = *capa;
726                         }
727                 }
728                 if (body->valid & OBD_MD_FLOSSCAPA) {
729                         struct lustre_capa *capa;
730
731                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
732                         if (capa == NULL)
733                                 RETURN(-EPROTO);
734                 }
735         } else if (it->it_op & IT_LAYOUT) {
736                 /* maybe the lock was granted right away and layout
737                  * is packed into RMF_DLM_LVB of req */
738                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
739                 if (lvb_len > 0) {
740                         lvb_data = req_capsule_server_sized_get(pill,
741                                                         &RMF_DLM_LVB, lvb_len);
742                         if (lvb_data == NULL)
743                                 RETURN(-EPROTO);
744                 }
745         }
746
747         /* fill in stripe data for layout lock */
748         lock = ldlm_handle2lock(lockh);
749         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
750                 void *lmm;
751
752                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
753                         ldlm_it2str(it->it_op), lvb_len);
754
755                 OBD_ALLOC_LARGE(lmm, lvb_len);
756                 if (lmm == NULL) {
757                         LDLM_LOCK_PUT(lock);
758                         RETURN(-ENOMEM);
759                 }
760                 memcpy(lmm, lvb_data, lvb_len);
761
762                 /* install lvb_data */
763                 lock_res_and_lock(lock);
764                 if (lock->l_lvb_data == NULL) {
765                         lock->l_lvb_type = LVB_T_LAYOUT;
766                         lock->l_lvb_data = lmm;
767                         lock->l_lvb_len = lvb_len;
768                         lmm = NULL;
769                 }
770                 unlock_res_and_lock(lock);
771                 if (lmm != NULL)
772                         OBD_FREE_LARGE(lmm, lvb_len);
773         }
774         if (lock != NULL)
775                 LDLM_LOCK_PUT(lock);
776
777         RETURN(rc);
778 }
779
780 /* We always reserve enough space in the reply packet for a stripe MD, because
781  * we don't know in advance the file type. */
782 int mdc_enqueue(struct obd_export *exp,
783                 struct ldlm_enqueue_info *einfo,
784                 const union ldlm_policy_data *policy,
785                 struct lookup_intent *it, struct md_op_data *op_data,
786                 struct lustre_handle *lockh, __u64 extra_lock_flags)
787 {
788         struct obd_device     *obddev = class_exp2obd(exp);
789         struct ptlrpc_request *req = NULL;
790         __u64                  flags, saved_flags = extra_lock_flags;
791         int                    rc;
792         struct ldlm_res_id res_id;
793         static const ldlm_policy_data_t lookup_policy =
794                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
795         static const ldlm_policy_data_t update_policy =
796                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
797         static const ldlm_policy_data_t layout_policy =
798                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
799         static const ldlm_policy_data_t getxattr_policy = {
800                               .l_inodebits = { MDS_INODELOCK_XATTR } };
801         int                    generation, resends = 0;
802         struct ldlm_reply     *lockrep;
803         enum lvb_type          lvb_type = 0;
804         ENTRY;
805
806         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
807                  einfo->ei_type);
808         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
809
810         if (it != NULL) {
811                 LASSERT(policy == NULL);
812
813                 saved_flags |= LDLM_FL_HAS_INTENT;
814                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
815                         policy = &update_policy;
816                 else if (it->it_op & IT_LAYOUT)
817                         policy = &layout_policy;
818                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
819                         policy = &getxattr_policy;
820                 else
821                         policy = &lookup_policy;
822         }
823
824         generation = obddev->u.cli.cl_import->imp_generation;
825 resend:
826         flags = saved_flags;
827         if (it == NULL) {
828                 /* The only way right now is FLOCK. */
829                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
830                          einfo->ei_type);
831                 res_id.name[3] = LDLM_FLOCK;
832         } else if (it->it_op & IT_OPEN) {
833                 LASSERT(einfo->ei_cbdata == NULL);
834                 req = mdc_intent_open_pack(exp, it, op_data);
835         } else if (it->it_op & IT_UNLINK) {
836                 req = mdc_intent_unlink_pack(exp, it, op_data);
837         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
838                 req = mdc_intent_getattr_pack(exp, it, op_data);
839         } else if (it->it_op & IT_READDIR) {
840                 req = mdc_enqueue_pack(exp, 0);
841         } else if (it->it_op & IT_LAYOUT) {
842                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
843                         RETURN(-EOPNOTSUPP);
844                 req = mdc_intent_layout_pack(exp, it, op_data);
845                 lvb_type = LVB_T_LAYOUT;
846         } else if (it->it_op & IT_GETXATTR) {
847                 req = mdc_intent_getxattr_pack(exp, it, op_data);
848         } else {
849                 LBUG();
850                 RETURN(-EINVAL);
851         }
852
853         if (IS_ERR(req))
854                 RETURN(PTR_ERR(req));
855
856         if (req != NULL && it && it->it_op & IT_CREAT)
857                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
858                  * retry logic */
859                 req->rq_no_retry_einprogress = 1;
860
861         if (resends) {
862                 req->rq_generation_set = 1;
863                 req->rq_import_generation = generation;
864                 req->rq_sent = cfs_time_current_sec() + resends;
865         }
866
867         /* It is important to obtain rpc_lock first (if applicable), so that
868          * threads that are serialised with rpc_lock are not polluting our
869          * rpcs in flight counter. We do not do flock request limiting, though*/
870         if (it) {
871                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
872                 rc = obd_get_request_slot(&obddev->u.cli);
873                 if (rc != 0) {
874                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875                         mdc_clear_replay_flag(req, 0);
876                         ptlrpc_req_finished(req);
877                         RETURN(rc);
878                 }
879         }
880
881         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
882                               0, lvb_type, lockh, 0);
883         if (!it) {
884                 /* For flock requests we immediatelly return without further
885                    delay and let caller deal with the rest, since rest of
886                    this function metadata processing makes no sense for flock
887                    requests anyway. But in case of problem during comms with
888                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
889                    can not rely on caller and this mainly for F_UNLCKs
890                    (explicits or automatically generated by Kernel to clean
891                    current FLocks upon exit) that can't be trashed */
892                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
893                     (einfo->ei_type == LDLM_FLOCK) &&
894                     (einfo->ei_mode == LCK_NL))
895                         goto resend;
896                 RETURN(rc);
897         }
898
899         obd_put_request_slot(&obddev->u.cli);
900         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
901
902         if (rc < 0) {
903                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
904                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
905                              obddev->obd_name, rc);
906
907                 mdc_clear_replay_flag(req, rc);
908                 ptlrpc_req_finished(req);
909                 RETURN(rc);
910         }
911
912         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
913         LASSERT(lockrep != NULL);
914
915         lockrep->lock_policy_res2 =
916                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
917
918         /* Retry the create infinitely when we get -EINPROGRESS from
919          * server. This is required by the new quota design. */
920         if (it && it->it_op & IT_CREAT &&
921             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
922                 mdc_clear_replay_flag(req, rc);
923                 ptlrpc_req_finished(req);
924                 resends++;
925
926                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
927                        obddev->obd_name, resends, it->it_op,
928                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
929
930                 if (generation == obddev->u.cli.cl_import->imp_generation) {
931                         goto resend;
932                 } else {
933                         CDEBUG(D_HA, "resend cross eviction\n");
934                         RETURN(-EIO);
935                 }
936         }
937
938         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
939         if (rc < 0) {
940                 if (lustre_handle_is_used(lockh)) {
941                         ldlm_lock_decref(lockh, einfo->ei_mode);
942                         memset(lockh, 0, sizeof(*lockh));
943                 }
944                 ptlrpc_req_finished(req);
945         }
946         RETURN(rc);
947 }
948
949 static int mdc_finish_intent_lock(struct obd_export *exp,
950                                   struct ptlrpc_request *request,
951                                   struct md_op_data *op_data,
952                                   struct lookup_intent *it,
953                                   struct lustre_handle *lockh)
954 {
955         struct lustre_handle old_lock;
956         struct mdt_body *mdt_body;
957         struct ldlm_lock *lock;
958         int rc;
959         ENTRY;
960
961         LASSERT(request != NULL);
962         LASSERT(request != LP_POISON);
963         LASSERT(request->rq_repmsg != LP_POISON);
964
965         if (it->it_op & IT_READDIR)
966                 RETURN(0);
967
968         if (!it_disposition(it, DISP_IT_EXECD)) {
969                 /* The server failed before it even started executing the
970                  * intent, i.e. because it couldn't unpack the request. */
971                 LASSERT(it->d.lustre.it_status != 0);
972                 RETURN(it->d.lustre.it_status);
973         }
974         rc = it_open_error(DISP_IT_EXECD, it);
975         if (rc)
976                 RETURN(rc);
977
978         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
979         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
980
981         /* If we were revalidating a fid/name pair, mark the intent in
982          * case we fail and get called again from lookup */
983         if (fid_is_sane(&op_data->op_fid2) &&
984             it->it_create_mode & M_CHECK_STALE &&
985             it->it_op != IT_GETATTR) {
986                 /* Also: did we find the same inode? */
987                 /* sever can return one of two fids:
988                  * op_fid2 - new allocated fid - if file is created.
989                  * op_fid3 - existent fid - if file only open.
990                  * op_fid3 is saved in lmv_intent_open */
991                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
992                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
993                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
994                                "\n", PFID(&op_data->op_fid2),
995                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
996                         RETURN(-ESTALE);
997                 }
998         }
999
1000         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1001         if (rc)
1002                 RETURN(rc);
1003
1004         /* keep requests around for the multiple phases of the call
1005          * this shows the DISP_XX must guarantee we make it into the call
1006          */
1007         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1008             it_disposition(it, DISP_OPEN_CREATE) &&
1009             !it_open_error(DISP_OPEN_CREATE, it)) {
1010                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1011                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1012         }
1013         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1014             it_disposition(it, DISP_OPEN_OPEN) &&
1015             !it_open_error(DISP_OPEN_OPEN, it)) {
1016                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1017                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1018                 /* BUG 11546 - eviction in the middle of open rpc processing */
1019                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1020         }
1021
1022         if (it->it_op & IT_CREAT) {
1023                 /* XXX this belongs in ll_create_it */
1024         } else if (it->it_op == IT_OPEN) {
1025                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1026         } else {
1027                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1028         }
1029
1030         /* If we already have a matching lock, then cancel the new
1031          * one.  We have to set the data here instead of in
1032          * mdc_enqueue, because we need to use the child's inode as
1033          * the l_ast_data to match, and that's not available until
1034          * intent_finish has performed the iget().) */
1035         lock = ldlm_handle2lock(lockh);
1036         if (lock) {
1037                 ldlm_policy_data_t policy = lock->l_policy_data;
1038                 LDLM_DEBUG(lock, "matching against this");
1039
1040                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1041                                          &lock->l_resource->lr_name),
1042                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1043                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1044                 LDLM_LOCK_PUT(lock);
1045
1046                 memcpy(&old_lock, lockh, sizeof(*lockh));
1047                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1048                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1049                         ldlm_lock_decref_and_cancel(lockh,
1050                                                     it->d.lustre.it_lock_mode);
1051                         memcpy(lockh, &old_lock, sizeof(old_lock));
1052                         it->d.lustre.it_lock_handle = lockh->cookie;
1053                 }
1054         }
1055         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1056                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1057                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1058         RETURN(rc);
1059 }
1060
1061 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1062                         struct lu_fid *fid, __u64 *bits)
1063 {
1064         /* We could just return 1 immediately, but since we should only
1065          * be called in revalidate_it if we already have a lock, let's
1066          * verify that. */
1067         struct ldlm_res_id res_id;
1068         struct lustre_handle lockh;
1069         ldlm_policy_data_t policy;
1070         ldlm_mode_t mode;
1071         ENTRY;
1072
1073         if (it->d.lustre.it_lock_handle) {
1074                 lockh.cookie = it->d.lustre.it_lock_handle;
1075                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1076         } else {
1077                 fid_build_reg_res_name(fid, &res_id);
1078                 switch (it->it_op) {
1079                 case IT_GETATTR:
1080                         /* File attributes are held under multiple bits:
1081                          * nlink is under lookup lock, size and times are
1082                          * under UPDATE lock and recently we've also got
1083                          * a separate permissions lock for owner/group/acl that
1084                          * were protected by lookup lock before.
1085                          * Getattr must provide all of that information,
1086                          * so we need to ensure we have all of those locks.
1087                          * Unfortunately, if the bits are split across multiple
1088                          * locks, there's no easy way to match all of them here,
1089                          * so an extra RPC would be performed to fetch all
1090                          * of those bits at once for now. */
1091                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1092                          * but for old MDTs (< 2.4), permission is covered
1093                          * by LOOKUP lock, so it needs to match all bits here.*/
1094                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1095                                                   MDS_INODELOCK_LOOKUP |
1096                                                   MDS_INODELOCK_PERM;
1097                         break;
1098                 case IT_READDIR:
1099                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1100                         break;
1101                 case IT_LAYOUT:
1102                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1103                         break;
1104                 default:
1105                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1106                         break;
1107                 }
1108
1109                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1110                                       LDLM_IBITS, &policy,
1111                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1112                                       &lockh);
1113         }
1114
1115         if (mode) {
1116                 it->d.lustre.it_lock_handle = lockh.cookie;
1117                 it->d.lustre.it_lock_mode = mode;
1118         } else {
1119                 it->d.lustre.it_lock_handle = 0;
1120                 it->d.lustre.it_lock_mode = 0;
1121         }
1122
1123         RETURN(!!mode);
1124 }
1125
1126 /*
1127  * This long block is all about fixing up the lock and request state
1128  * so that it is correct as of the moment _before_ the operation was
1129  * applied; that way, the VFS will think that everything is normal and
1130  * call Lustre's regular VFS methods.
1131  *
1132  * If we're performing a creation, that means that unless the creation
1133  * failed with EEXIST, we should fake up a negative dentry.
1134  *
1135  * For everything else, we want to lookup to succeed.
1136  *
1137  * One additional note: if CREATE or OPEN succeeded, we add an extra
1138  * reference to the request because we need to keep it around until
1139  * ll_create/ll_open gets called.
1140  *
1141  * The server will return to us, in it_disposition, an indication of
1142  * exactly what d.lustre.it_status refers to.
1143  *
1144  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1145  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1146  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1147  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1148  * was successful.
1149  *
1150  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1151  * child lookup.
1152  */
1153 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1154                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1155                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1156 {
1157         struct ldlm_enqueue_info einfo = {
1158                 .ei_type        = LDLM_IBITS,
1159                 .ei_mode        = it_to_lock_mode(it),
1160                 .ei_cb_bl       = cb_blocking,
1161                 .ei_cb_cp       = ldlm_completion_ast,
1162         };
1163         struct lustre_handle lockh;
1164         int rc = 0;
1165         ENTRY;
1166         LASSERT(it);
1167
1168         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1169                 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1170                 op_data->op_name, PFID(&op_data->op_fid2),
1171                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1172                 it->it_flags);
1173
1174         lockh.cookie = 0;
1175         if (fid_is_sane(&op_data->op_fid2) &&
1176             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1177                 /* We could just return 1 immediately, but since we should only
1178                  * be called in revalidate_it if we already have a lock, let's
1179                  * verify that. */
1180                 it->d.lustre.it_lock_handle = 0;
1181                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1182                 /* Only return failure if it was not GETATTR by cfid
1183                    (from inode_revalidate) */
1184                 if (rc || op_data->op_namelen != 0)
1185                         RETURN(rc);
1186         }
1187
1188         /* For case if upper layer did not alloc fid, do it now. */
1189         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1190                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1191                 if (rc < 0) {
1192                         CERROR("Can't alloc new fid, rc %d\n", rc);
1193                         RETURN(rc);
1194                 }
1195         }
1196
1197         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1198                          extra_lock_flags);
1199         if (rc < 0)
1200                 RETURN(rc);
1201
1202         *reqp = it->d.lustre.it_data;
1203         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1204         RETURN(rc);
1205 }
1206
1207 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1208                                               struct ptlrpc_request *req,
1209                                               void *args, int rc)
1210 {
1211         struct mdc_getattr_args  *ga = args;
1212         struct obd_export        *exp = ga->ga_exp;
1213         struct md_enqueue_info   *minfo = ga->ga_minfo;
1214         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1215         struct lookup_intent     *it;
1216         struct lustre_handle     *lockh;
1217         struct obd_device        *obddev;
1218         struct ldlm_reply        *lockrep;
1219         __u64                     flags = LDLM_FL_HAS_INTENT;
1220         ENTRY;
1221
1222         it    = &minfo->mi_it;
1223         lockh = &minfo->mi_lockh;
1224
1225         obddev = class_exp2obd(exp);
1226
1227         obd_put_request_slot(&obddev->u.cli);
1228         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1229                 rc = -ETIMEDOUT;
1230
1231         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1232                                    &flags, NULL, 0, lockh, rc);
1233         if (rc < 0) {
1234                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1235                 mdc_clear_replay_flag(req, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1240         LASSERT(lockrep != NULL);
1241
1242         lockrep->lock_policy_res2 =
1243                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1244
1245         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1250         EXIT;
1251
1252 out:
1253         OBD_FREE_PTR(einfo);
1254         minfo->mi_cb(req, minfo, rc);
1255         return 0;
1256 }
1257
1258 int mdc_intent_getattr_async(struct obd_export *exp,
1259                              struct md_enqueue_info *minfo,
1260                              struct ldlm_enqueue_info *einfo)
1261 {
1262         struct md_op_data       *op_data = &minfo->mi_data;
1263         struct lookup_intent    *it = &minfo->mi_it;
1264         struct ptlrpc_request   *req;
1265         struct mdc_getattr_args *ga;
1266         struct obd_device       *obddev = class_exp2obd(exp);
1267         struct ldlm_res_id       res_id;
1268         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1269          *     for statahead currently. Consider CMD in future, such two bits
1270          *     maybe managed by different MDS, should be adjusted then. */
1271         ldlm_policy_data_t       policy = {
1272                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1273                                                          MDS_INODELOCK_UPDATE }
1274                                  };
1275         int                      rc = 0;
1276         __u64                    flags = LDLM_FL_HAS_INTENT;
1277         ENTRY;
1278
1279         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1280                 LPF64"o\n",
1281                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282                 ldlm_it2str(it->it_op), it->it_flags);
1283
1284         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285         req = mdc_intent_getattr_pack(exp, it, op_data);
1286         if (IS_ERR(req))
1287                 RETURN(PTR_ERR(req));
1288
1289         rc = obd_get_request_slot(&obddev->u.cli);
1290         if (rc != 0) {
1291                 ptlrpc_req_finished(req);
1292                 RETURN(rc);
1293         }
1294
1295         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297         if (rc < 0) {
1298                 obd_put_request_slot(&obddev->u.cli);
1299                 ptlrpc_req_finished(req);
1300                 RETURN(rc);
1301         }
1302
1303         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304         ga = ptlrpc_req_async_args(req);
1305         ga->ga_exp = exp;
1306         ga->ga_minfo = minfo;
1307         ga->ga_einfo = einfo;
1308
1309         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1310         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1311
1312         RETURN(0);
1313 }