Whamcloud - gitweb
Revert "LU-2675 obd: decruft md_enqueue() and md_intent_lock()"
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_open_error(int phase, struct lookup_intent *it)
62 {
63         if (it_disposition(it, DISP_OPEN_LEASE)) {
64                 if (phase >= DISP_OPEN_LEASE)
65                         return it->d.lustre.it_status;
66                 else
67                         return 0;
68         }
69         if (it_disposition(it, DISP_OPEN_OPEN)) {
70                 if (phase >= DISP_OPEN_OPEN)
71                         return it->d.lustre.it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_OPEN_CREATE)) {
77                 if (phase >= DISP_OPEN_CREATE)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84                 if (phase >= DISP_LOOKUP_EXECD)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_IT_EXECD)) {
91                 if (phase >= DISP_IT_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97                it->d.lustre.it_status);
98         LBUG();
99         return 0;
100 }
101 EXPORT_SYMBOL(it_open_error);
102
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
105                       __u64 *bits)
106 {
107         struct ldlm_lock *lock;
108         struct inode *new_inode = data;
109         ENTRY;
110
111         if(bits)
112                 *bits = 0;
113
114         if (!*lockh)
115                 RETURN(0);
116
117         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
118
119         LASSERT(lock != NULL);
120         lock_res_and_lock(lock);
121 #ifdef __KERNEL__
122         if (lock->l_resource->lr_lvb_inode &&
123             lock->l_resource->lr_lvb_inode != data) {
124                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125                 LASSERTF(old_inode->i_state & I_FREEING,
126                          "Found existing inode %p/%lu/%u state %lu in lock: "
127                          "setting data to %p/%lu/%u\n", old_inode,
128                          old_inode->i_ino, old_inode->i_generation,
129                          old_inode->i_state,
130                          new_inode, new_inode->i_ino, new_inode->i_generation);
131         }
132 #endif
133         lock->l_resource->lr_lvb_inode = new_inode;
134         if (bits)
135                 *bits = lock->l_policy_data.l_inodebits.bits;
136
137         unlock_res_and_lock(lock);
138         LDLM_LOCK_PUT(lock);
139
140         RETURN(0);
141 }
142
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144                            const struct lu_fid *fid, ldlm_type_t type,
145                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
146                            struct lustre_handle *lockh)
147 {
148         struct ldlm_res_id res_id;
149         ldlm_mode_t rc;
150         ENTRY;
151
152         fid_build_reg_res_name(fid, &res_id);
153         /* LU-4405: Clear bits not supported by server */
154         policy->l_inodebits.bits &= exp_connect_ibits(exp);
155         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156                              &res_id, type, policy, mode, lockh, 0);
157         RETURN(rc);
158 }
159
160 int mdc_cancel_unused(struct obd_export *exp,
161                       const struct lu_fid *fid,
162                       ldlm_policy_data_t *policy,
163                       ldlm_mode_t mode,
164                       ldlm_cancel_flags_t flags,
165                       void *opaque)
166 {
167         struct ldlm_res_id res_id;
168         struct obd_device *obd = class_exp2obd(exp);
169         int rc;
170
171         ENTRY;
172
173         fid_build_reg_res_name(fid, &res_id);
174         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175                                              policy, mode, flags, opaque);
176         RETURN(rc);
177 }
178
179 int mdc_null_inode(struct obd_export *exp,
180                    const struct lu_fid *fid)
181 {
182         struct ldlm_res_id res_id;
183         struct ldlm_resource *res;
184         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
185         ENTRY;
186
187         LASSERTF(ns != NULL, "no namespace passed\n");
188
189         fid_build_reg_res_name(fid, &res_id);
190
191         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
192         if(res == NULL)
193                 RETURN(0);
194
195         lock_res(res);
196         res->lr_lvb_inode = NULL;
197         unlock_res(res);
198
199         ldlm_resource_putref(res);
200         RETURN(0);
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267                                                    struct lookup_intent *it,
268                                                    struct md_op_data *op_data,
269                                                    void *lmm, int lmmsize,
270                                                    void *cb_data)
271 {
272         struct ptlrpc_request *req;
273         struct obd_device     *obddev = class_exp2obd(exp);
274         struct ldlm_intent    *lit;
275         CFS_LIST_HEAD(cancels);
276         int                    count = 0;
277         int                    mode;
278         int                    rc;
279         ENTRY;
280
281         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
282
283         /* XXX: openlock is not cancelled for cross-refs. */
284         /* If inode is known, cancel conflicting OPEN locks. */
285         if (fid_is_sane(&op_data->op_fid2)) {
286                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
287                         if (it->it_flags & FMODE_WRITE)
288                                 mode = LCK_EX;
289                         else
290                                 mode = LCK_PR;
291                 } else {
292                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
293                                 mode = LCK_CW;
294 #ifdef FMODE_EXEC
295                         else if (it->it_flags & FMODE_EXEC)
296                                 mode = LCK_PR;
297 #endif
298                         else
299                                 mode = LCK_CR;
300                 }
301                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
302                                                 &cancels, mode,
303                                                 MDS_INODELOCK_OPEN);
304         }
305
306         /* If CREATE, cancel parent's UPDATE lock. */
307         if (it->it_op & IT_CREAT)
308                 mode = LCK_EX;
309         else
310                 mode = LCK_CR;
311         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
312                                          &cancels, mode,
313                                          MDS_INODELOCK_UPDATE);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
316                                    &RQF_LDLM_INTENT_OPEN);
317         if (req == NULL) {
318                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
319                 RETURN(ERR_PTR(-ENOMEM));
320         }
321
322         /* parent capability */
323         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
324         /* child capability, reserve the size according to parent capa, it will
325          * be filled after we get the reply */
326         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
327
328         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
329                              op_data->op_namelen + 1);
330         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
331                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
332
333         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334         if (rc < 0) {
335                 ptlrpc_request_free(req);
336                 RETURN(ERR_PTR(rc));
337         }
338
339         spin_lock(&req->rq_lock);
340         req->rq_replay = req->rq_import->imp_replayable;
341         spin_unlock(&req->rq_lock);
342
343         /* pack the intent */
344         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
345         lit->opc = (__u64)it->it_op;
346
347         /* pack the intended request */
348         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
349                       lmmsize);
350
351         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
352                              obddev->u.cli.cl_max_mds_easize);
353
354         /* for remote client, fetch remote perm for current user */
355         if (client_is_remote(exp))
356                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357                                      sizeof(struct mdt_remote_perm));
358         ptlrpc_request_set_replen(req);
359         return req;
360 }
361
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364                          struct lookup_intent *it,
365                          struct md_op_data *op_data)
366 {
367         struct ptlrpc_request   *req;
368         struct ldlm_intent      *lit;
369         int                     rc, count = 0, maxdata;
370         CFS_LIST_HEAD(cancels);
371
372         ENTRY;
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375                                         &RQF_LDLM_INTENT_GETXATTR);
376         if (req == NULL)
377                 RETURN(ERR_PTR(-ENOMEM));
378
379         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380
381         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(ERR_PTR(rc));
385         }
386
387         /* pack the intent */
388         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389         lit->opc = IT_GETXATTR;
390
391         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
392
393         /* pack the intended request */
394         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395                         op_data->op_valid, maxdata, -1, 0);
396
397         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398                                 RCL_SERVER, maxdata);
399
400         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401                                 RCL_SERVER, maxdata);
402
403         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404                                 RCL_SERVER, maxdata);
405
406         ptlrpc_request_set_replen(req);
407
408         RETURN(req);
409 }
410
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412                                                      struct lookup_intent *it,
413                                                      struct md_op_data *op_data)
414 {
415         struct ptlrpc_request *req;
416         struct obd_device     *obddev = class_exp2obd(exp);
417         struct ldlm_intent    *lit;
418         int                    rc;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422                                    &RQF_LDLM_INTENT_UNLINK);
423         if (req == NULL)
424                 RETURN(ERR_PTR(-ENOMEM));
425
426         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
427         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
428                              op_data->op_namelen + 1);
429
430         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
431         if (rc) {
432                 ptlrpc_request_free(req);
433                 RETURN(ERR_PTR(rc));
434         }
435
436         /* pack the intent */
437         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
438         lit->opc = (__u64)it->it_op;
439
440         /* pack the intended request */
441         mdc_unlink_pack(req, op_data);
442
443         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444                              obddev->u.cli.cl_default_mds_easize);
445         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
446                              obddev->u.cli.cl_default_mds_cookiesize);
447         ptlrpc_request_set_replen(req);
448         RETURN(req);
449 }
450
451 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
452                                                       struct lookup_intent *it,
453                                                       struct md_op_data *op_data)
454 {
455         struct ptlrpc_request *req;
456         struct obd_device     *obddev = class_exp2obd(exp);
457         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
460                                        (client_is_remote(exp) ?
461                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
462         struct ldlm_intent    *lit;
463         int                    rc;
464         int                     easize;
465         ENTRY;
466
467         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
468                                    &RQF_LDLM_INTENT_GETATTR);
469         if (req == NULL)
470                 RETURN(ERR_PTR(-ENOMEM));
471
472         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
473         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
474                              op_data->op_namelen + 1);
475
476         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 RETURN(ERR_PTR(rc));
480         }
481
482         /* pack the intent */
483         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484         lit->opc = (__u64)it->it_op;
485
486         if (obddev->u.cli.cl_default_mds_easize > 0)
487                 easize = obddev->u.cli.cl_default_mds_easize;
488         else
489                 easize = obddev->u.cli.cl_max_mds_easize;
490
491         /* pack the intended request */
492         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
493
494         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
495         if (client_is_remote(exp))
496                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
497                                      sizeof(struct mdt_remote_perm));
498         ptlrpc_request_set_replen(req);
499         RETURN(req);
500 }
501
502 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
503                                                      struct lookup_intent *it,
504                                                      struct md_op_data *unused)
505 {
506         struct obd_device     *obd = class_exp2obd(exp);
507         struct ptlrpc_request *req;
508         struct ldlm_intent    *lit;
509         struct layout_intent  *layout;
510         int rc;
511         ENTRY;
512
513         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
514                                 &RQF_LDLM_INTENT_LAYOUT);
515         if (req == NULL)
516                 RETURN(ERR_PTR(-ENOMEM));
517
518         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
519         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
520         if (rc) {
521                 ptlrpc_request_free(req);
522                 RETURN(ERR_PTR(rc));
523         }
524
525         /* pack the intent */
526         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
527         lit->opc = (__u64)it->it_op;
528
529         /* pack the layout intent request */
530         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
531         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
532          * set for replication */
533         layout->li_opc = LAYOUT_INTENT_ACCESS;
534
535         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
536                              obd->u.cli.cl_default_mds_easize);
537         ptlrpc_request_set_replen(req);
538         RETURN(req);
539 }
540
541 static struct ptlrpc_request *
542 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
543 {
544         struct ptlrpc_request *req;
545         int rc;
546         ENTRY;
547
548         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
549         if (req == NULL)
550                 RETURN(ERR_PTR(-ENOMEM));
551
552         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
553         if (rc) {
554                 ptlrpc_request_free(req);
555                 RETURN(ERR_PTR(rc));
556         }
557
558         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
559         ptlrpc_request_set_replen(req);
560         RETURN(req);
561 }
562
563 static int mdc_finish_enqueue(struct obd_export *exp,
564                               struct ptlrpc_request *req,
565                               struct ldlm_enqueue_info *einfo,
566                               struct lookup_intent *it,
567                               struct lustre_handle *lockh,
568                               int rc)
569 {
570         struct req_capsule  *pill = &req->rq_pill;
571         struct ldlm_request *lockreq;
572         struct ldlm_reply   *lockrep;
573         struct lustre_intent_data *intent = &it->d.lustre;
574         struct ldlm_lock    *lock;
575         void                *lvb_data = NULL;
576         int                  lvb_len = 0;
577         ENTRY;
578
579         LASSERT(rc >= 0);
580         /* Similarly, if we're going to replay this request, we don't want to
581          * actually get a lock, just perform the intent. */
582         if (req->rq_transno || req->rq_replay) {
583                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
584                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
585         }
586
587         if (rc == ELDLM_LOCK_ABORTED) {
588                 einfo->ei_mode = 0;
589                 memset(lockh, 0, sizeof(*lockh));
590                 rc = 0;
591         } else { /* rc = 0 */
592                 lock = ldlm_handle2lock(lockh);
593                 LASSERT(lock != NULL);
594
595                 /* If the server gave us back a different lock mode, we should
596                  * fix up our variables. */
597                 if (lock->l_req_mode != einfo->ei_mode) {
598                         ldlm_lock_addref(lockh, lock->l_req_mode);
599                         ldlm_lock_decref(lockh, einfo->ei_mode);
600                         einfo->ei_mode = lock->l_req_mode;
601                 }
602                 LDLM_LOCK_PUT(lock);
603         }
604
605         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
606         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
607
608         intent->it_disposition = (int)lockrep->lock_policy_res1;
609         intent->it_status = (int)lockrep->lock_policy_res2;
610         intent->it_lock_mode = einfo->ei_mode;
611         intent->it_lock_handle = lockh->cookie;
612         intent->it_data = req;
613
614         /* Technically speaking rq_transno must already be zero if
615          * it_status is in error, so the check is a bit redundant */
616         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
617                 mdc_clear_replay_flag(req, intent->it_status);
618
619         /* If we're doing an IT_OPEN which did not result in an actual
620          * successful open, then we need to remove the bit which saves
621          * this request for unconditional replay.
622          *
623          * It's important that we do this first!  Otherwise we might exit the
624          * function without doing so, and try to replay a failed create
625          * (bug 3440) */
626         if (it->it_op & IT_OPEN && req->rq_replay &&
627             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
628                 mdc_clear_replay_flag(req, intent->it_status);
629
630         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
631                   it->it_op, intent->it_disposition, intent->it_status);
632
633         /* We know what to expect, so we do any byte flipping required here */
634         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
635                 struct mdt_body *body;
636
637                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
638                 if (body == NULL) {
639                         CERROR ("Can't swab mdt_body\n");
640                         RETURN (-EPROTO);
641                 }
642
643                 if (it_disposition(it, DISP_OPEN_OPEN) &&
644                     !it_open_error(DISP_OPEN_OPEN, it)) {
645                         /*
646                          * If this is a successful OPEN request, we need to set
647                          * replay handler and data early, so that if replay
648                          * happens immediately after swabbing below, new reply
649                          * is swabbed by that handler correctly.
650                          */
651                         mdc_set_open_replay_data(NULL, NULL, it);
652                 }
653
654                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
655                         void *eadata;
656
657                         mdc_update_max_ea_from_body(exp, body);
658
659                         /*
660                          * The eadata is opaque; just check that it is there.
661                          * Eventually, obd_unpackmd() will check the contents.
662                          */
663                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
664                                                               body->eadatasize);
665                         if (eadata == NULL)
666                                 RETURN(-EPROTO);
667
668                         /* save lvb data and length in case this is for layout
669                          * lock */
670                         lvb_data = eadata;
671                         lvb_len = body->eadatasize;
672
673                         /*
674                          * We save the reply LOV EA in case we have to replay a
675                          * create for recovery.  If we didn't allocate a large
676                          * enough request buffer above we need to reallocate it
677                          * here to hold the actual LOV EA.
678                          *
679                          * To not save LOV EA if request is not going to replay
680                          * (for example error one).
681                          */
682                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
683                                 void *lmm;
684                                 if (req_capsule_get_size(pill, &RMF_EADATA,
685                                                          RCL_CLIENT) <
686                                     body->eadatasize)
687                                         mdc_realloc_openmsg(req, body);
688                                 else
689                                         req_capsule_shrink(pill, &RMF_EADATA,
690                                                            body->eadatasize,
691                                                            RCL_CLIENT);
692
693                                 req_capsule_set_size(pill, &RMF_EADATA,
694                                                      RCL_CLIENT,
695                                                      body->eadatasize);
696
697                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
698                                 if (lmm)
699                                         memcpy(lmm, eadata, body->eadatasize);
700                         }
701                 }
702
703                 if (body->valid & OBD_MD_FLRMTPERM) {
704                         struct mdt_remote_perm *perm;
705
706                         LASSERT(client_is_remote(exp));
707                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
708                                                 lustre_swab_mdt_remote_perm);
709                         if (perm == NULL)
710                                 RETURN(-EPROTO);
711                 }
712                 if (body->valid & OBD_MD_FLMDSCAPA) {
713                         struct lustre_capa *capa, *p;
714
715                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
716                         if (capa == NULL)
717                                 RETURN(-EPROTO);
718
719                         if (it->it_op & IT_OPEN) {
720                                 /* client fid capa will be checked in replay */
721                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
722                                 LASSERT(p);
723                                 *p = *capa;
724                         }
725                 }
726                 if (body->valid & OBD_MD_FLOSSCAPA) {
727                         struct lustre_capa *capa;
728
729                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
730                         if (capa == NULL)
731                                 RETURN(-EPROTO);
732                 }
733         } else if (it->it_op & IT_LAYOUT) {
734                 /* maybe the lock was granted right away and layout
735                  * is packed into RMF_DLM_LVB of req */
736                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
737                 if (lvb_len > 0) {
738                         lvb_data = req_capsule_server_sized_get(pill,
739                                                         &RMF_DLM_LVB, lvb_len);
740                         if (lvb_data == NULL)
741                                 RETURN(-EPROTO);
742                 }
743         }
744
745         /* fill in stripe data for layout lock */
746         lock = ldlm_handle2lock(lockh);
747         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
748                 void *lmm;
749
750                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
751                         ldlm_it2str(it->it_op), lvb_len);
752
753                 OBD_ALLOC_LARGE(lmm, lvb_len);
754                 if (lmm == NULL) {
755                         LDLM_LOCK_PUT(lock);
756                         RETURN(-ENOMEM);
757                 }
758                 memcpy(lmm, lvb_data, lvb_len);
759
760                 /* install lvb_data */
761                 lock_res_and_lock(lock);
762                 if (lock->l_lvb_data == NULL) {
763                         lock->l_lvb_type = LVB_T_LAYOUT;
764                         lock->l_lvb_data = lmm;
765                         lock->l_lvb_len = lvb_len;
766                         lmm = NULL;
767                 }
768                 unlock_res_and_lock(lock);
769                 if (lmm != NULL)
770                         OBD_FREE_LARGE(lmm, lvb_len);
771         }
772         if (lock != NULL)
773                 LDLM_LOCK_PUT(lock);
774
775         RETURN(rc);
776 }
777
778 /* We always reserve enough space in the reply packet for a stripe MD, because
779  * we don't know in advance the file type. */
780 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
781                 struct lookup_intent *it, struct md_op_data *op_data,
782                 struct lustre_handle *lockh, void *lmm, int lmmsize,
783                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
784 {
785         struct obd_device     *obddev = class_exp2obd(exp);
786         struct ptlrpc_request *req = NULL;
787         __u64                  flags, saved_flags = extra_lock_flags;
788         int                    rc;
789         struct ldlm_res_id res_id;
790         static const ldlm_policy_data_t lookup_policy =
791                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
792         static const ldlm_policy_data_t update_policy =
793                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
794         static const ldlm_policy_data_t layout_policy =
795                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
796         static const ldlm_policy_data_t getxattr_policy = {
797                               .l_inodebits = { MDS_INODELOCK_XATTR } };
798         ldlm_policy_data_t const *policy = &lookup_policy;
799         int                    generation, resends = 0;
800         struct ldlm_reply     *lockrep;
801         enum lvb_type          lvb_type = 0;
802         ENTRY;
803
804         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
805                  einfo->ei_type);
806
807         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
808
809         if (it) {
810                 saved_flags |= LDLM_FL_HAS_INTENT;
811                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
812                         policy = &update_policy;
813                 else if (it->it_op & IT_LAYOUT)
814                         policy = &layout_policy;
815                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
816                         policy = &getxattr_policy;
817         }
818
819         LASSERT(reqp == NULL);
820
821         generation = obddev->u.cli.cl_import->imp_generation;
822 resend:
823         flags = saved_flags;
824         if (!it) {
825                 /* The only way right now is FLOCK, in this case we hide flock
826                    policy as lmm, but lmmsize is 0 */
827                 LASSERT(lmm && lmmsize == 0);
828                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
829                          einfo->ei_type);
830                 policy = (ldlm_policy_data_t *)lmm;
831                 res_id.name[3] = LDLM_FLOCK;
832         } else if (it->it_op & IT_OPEN) {
833                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
834                                            einfo->ei_cbdata);
835                 policy = &update_policy;
836                 einfo->ei_cbdata = NULL;
837                 lmm = NULL;
838         } else if (it->it_op & IT_UNLINK) {
839                 req = mdc_intent_unlink_pack(exp, it, op_data);
840         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
841                 req = mdc_intent_getattr_pack(exp, it, op_data);
842         } else if (it->it_op & IT_READDIR) {
843                 req = mdc_enqueue_pack(exp, 0);
844         } else if (it->it_op & IT_LAYOUT) {
845                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
846                         RETURN(-EOPNOTSUPP);
847                 req = mdc_intent_layout_pack(exp, it, op_data);
848                 lvb_type = LVB_T_LAYOUT;
849         } else if (it->it_op & IT_GETXATTR) {
850                 req = mdc_intent_getxattr_pack(exp, it, op_data);
851         } else {
852                 LBUG();
853                 RETURN(-EINVAL);
854         }
855
856         if (IS_ERR(req))
857                 RETURN(PTR_ERR(req));
858
859         if (req != NULL && it && it->it_op & IT_CREAT)
860                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
861                  * retry logic */
862                 req->rq_no_retry_einprogress = 1;
863
864         if (resends) {
865                 req->rq_generation_set = 1;
866                 req->rq_import_generation = generation;
867                 req->rq_sent = cfs_time_current_sec() + resends;
868         }
869
870         /* It is important to obtain rpc_lock first (if applicable), so that
871          * threads that are serialised with rpc_lock are not polluting our
872          * rpcs in flight counter. We do not do flock request limiting, though*/
873         if (it) {
874                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875                 rc = obd_get_request_slot(&obddev->u.cli);
876                 if (rc != 0) {
877                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
878                         mdc_clear_replay_flag(req, 0);
879                         ptlrpc_req_finished(req);
880                         RETURN(rc);
881                 }
882         }
883
884         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
885                               0, lvb_type, lockh, 0);
886         if (!it) {
887                 /* For flock requests we immediatelly return without further
888                    delay and let caller deal with the rest, since rest of
889                    this function metadata processing makes no sense for flock
890                    requests anyway. But in case of problem during comms with
891                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
892                    can not rely on caller and this mainly for F_UNLCKs
893                    (explicits or automatically generated by Kernel to clean
894                    current FLocks upon exit) that can't be trashed */
895                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
896                     (einfo->ei_type == LDLM_FLOCK) &&
897                     (einfo->ei_mode == LCK_NL))
898                         goto resend;
899                 RETURN(rc);
900         }
901
902         obd_put_request_slot(&obddev->u.cli);
903         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
904
905         if (rc < 0) {
906                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
907                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
908                              obddev->obd_name, rc);
909
910                 mdc_clear_replay_flag(req, rc);
911                 ptlrpc_req_finished(req);
912                 RETURN(rc);
913         }
914
915         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
916         LASSERT(lockrep != NULL);
917
918         lockrep->lock_policy_res2 =
919                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
920
921         /* Retry the create infinitely when we get -EINPROGRESS from
922          * server. This is required by the new quota design. */
923         if (it && it->it_op & IT_CREAT &&
924             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
925                 mdc_clear_replay_flag(req, rc);
926                 ptlrpc_req_finished(req);
927                 resends++;
928
929                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
930                        obddev->obd_name, resends, it->it_op,
931                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
932
933                 if (generation == obddev->u.cli.cl_import->imp_generation) {
934                         goto resend;
935                 } else {
936                         CDEBUG(D_HA, "resend cross eviction\n");
937                         RETURN(-EIO);
938                 }
939         }
940
941         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
942         if (rc < 0) {
943                 if (lustre_handle_is_used(lockh)) {
944                         ldlm_lock_decref(lockh, einfo->ei_mode);
945                         memset(lockh, 0, sizeof(*lockh));
946                 }
947                 ptlrpc_req_finished(req);
948         }
949         RETURN(rc);
950 }
951
952 static int mdc_finish_intent_lock(struct obd_export *exp,
953                                   struct ptlrpc_request *request,
954                                   struct md_op_data *op_data,
955                                   struct lookup_intent *it,
956                                   struct lustre_handle *lockh)
957 {
958         struct lustre_handle old_lock;
959         struct mdt_body *mdt_body;
960         struct ldlm_lock *lock;
961         int rc;
962         ENTRY;
963
964         LASSERT(request != NULL);
965         LASSERT(request != LP_POISON);
966         LASSERT(request->rq_repmsg != LP_POISON);
967
968         if (it->it_op & IT_READDIR)
969                 RETURN(0);
970
971         if (!it_disposition(it, DISP_IT_EXECD)) {
972                 /* The server failed before it even started executing the
973                  * intent, i.e. because it couldn't unpack the request. */
974                 LASSERT(it->d.lustre.it_status != 0);
975                 RETURN(it->d.lustre.it_status);
976         }
977         rc = it_open_error(DISP_IT_EXECD, it);
978         if (rc)
979                 RETURN(rc);
980
981         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
982         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
983
984         /* If we were revalidating a fid/name pair, mark the intent in
985          * case we fail and get called again from lookup */
986         if (fid_is_sane(&op_data->op_fid2) &&
987             it->it_create_mode & M_CHECK_STALE &&
988             it->it_op != IT_GETATTR) {
989                 /* Also: did we find the same inode? */
990                 /* sever can return one of two fids:
991                  * op_fid2 - new allocated fid - if file is created.
992                  * op_fid3 - existent fid - if file only open.
993                  * op_fid3 is saved in lmv_intent_open */
994                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
995                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
996                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
997                                "\n", PFID(&op_data->op_fid2),
998                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
999                         RETURN(-ESTALE);
1000                 }
1001         }
1002
1003         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1004         if (rc)
1005                 RETURN(rc);
1006
1007         /* keep requests around for the multiple phases of the call
1008          * this shows the DISP_XX must guarantee we make it into the call
1009          */
1010         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1011             it_disposition(it, DISP_OPEN_CREATE) &&
1012             !it_open_error(DISP_OPEN_CREATE, it)) {
1013                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1014                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1015         }
1016         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1017             it_disposition(it, DISP_OPEN_OPEN) &&
1018             !it_open_error(DISP_OPEN_OPEN, it)) {
1019                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1020                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1021                 /* BUG 11546 - eviction in the middle of open rpc processing */
1022                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1023         }
1024
1025         if (it->it_op & IT_CREAT) {
1026                 /* XXX this belongs in ll_create_it */
1027         } else if (it->it_op == IT_OPEN) {
1028                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1029         } else {
1030                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1031         }
1032
1033         /* If we already have a matching lock, then cancel the new
1034          * one.  We have to set the data here instead of in
1035          * mdc_enqueue, because we need to use the child's inode as
1036          * the l_ast_data to match, and that's not available until
1037          * intent_finish has performed the iget().) */
1038         lock = ldlm_handle2lock(lockh);
1039         if (lock) {
1040                 ldlm_policy_data_t policy = lock->l_policy_data;
1041                 LDLM_DEBUG(lock, "matching against this");
1042
1043                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1044                                          &lock->l_resource->lr_name),
1045                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1046                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1047                 LDLM_LOCK_PUT(lock);
1048
1049                 memcpy(&old_lock, lockh, sizeof(*lockh));
1050                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1051                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1052                         ldlm_lock_decref_and_cancel(lockh,
1053                                                     it->d.lustre.it_lock_mode);
1054                         memcpy(lockh, &old_lock, sizeof(old_lock));
1055                         it->d.lustre.it_lock_handle = lockh->cookie;
1056                 }
1057         }
1058         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1059                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1060                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1061         RETURN(rc);
1062 }
1063
1064 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1065                         struct lu_fid *fid, __u64 *bits)
1066 {
1067         /* We could just return 1 immediately, but since we should only
1068          * be called in revalidate_it if we already have a lock, let's
1069          * verify that. */
1070         struct ldlm_res_id res_id;
1071         struct lustre_handle lockh;
1072         ldlm_policy_data_t policy;
1073         ldlm_mode_t mode;
1074         ENTRY;
1075
1076         if (it->d.lustre.it_lock_handle) {
1077                 lockh.cookie = it->d.lustre.it_lock_handle;
1078                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1079         } else {
1080                 fid_build_reg_res_name(fid, &res_id);
1081                 switch (it->it_op) {
1082                 case IT_GETATTR:
1083                         /* File attributes are held under multiple bits:
1084                          * nlink is under lookup lock, size and times are
1085                          * under UPDATE lock and recently we've also got
1086                          * a separate permissions lock for owner/group/acl that
1087                          * were protected by lookup lock before.
1088                          * Getattr must provide all of that information,
1089                          * so we need to ensure we have all of those locks.
1090                          * Unfortunately, if the bits are split across multiple
1091                          * locks, there's no easy way to match all of them here,
1092                          * so an extra RPC would be performed to fetch all
1093                          * of those bits at once for now. */
1094                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1095                          * but for old MDTs (< 2.4), permission is covered
1096                          * by LOOKUP lock, so it needs to match all bits here.*/
1097                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1098                                                   MDS_INODELOCK_LOOKUP |
1099                                                   MDS_INODELOCK_PERM;
1100                         break;
1101                 case IT_READDIR:
1102                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1103                         break;
1104                 case IT_LAYOUT:
1105                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1106                         break;
1107                 default:
1108                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1109                         break;
1110                 }
1111
1112                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1113                                       LDLM_IBITS, &policy,
1114                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1115                                       &lockh);
1116         }
1117
1118         if (mode) {
1119                 it->d.lustre.it_lock_handle = lockh.cookie;
1120                 it->d.lustre.it_lock_mode = mode;
1121         } else {
1122                 it->d.lustre.it_lock_handle = 0;
1123                 it->d.lustre.it_lock_mode = 0;
1124         }
1125
1126         RETURN(!!mode);
1127 }
1128
1129 /*
1130  * This long block is all about fixing up the lock and request state
1131  * so that it is correct as of the moment _before_ the operation was
1132  * applied; that way, the VFS will think that everything is normal and
1133  * call Lustre's regular VFS methods.
1134  *
1135  * If we're performing a creation, that means that unless the creation
1136  * failed with EEXIST, we should fake up a negative dentry.
1137  *
1138  * For everything else, we want to lookup to succeed.
1139  *
1140  * One additional note: if CREATE or OPEN succeeded, we add an extra
1141  * reference to the request because we need to keep it around until
1142  * ll_create/ll_open gets called.
1143  *
1144  * The server will return to us, in it_disposition, an indication of
1145  * exactly what d.lustre.it_status refers to.
1146  *
1147  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1148  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1149  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1150  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1151  * was successful.
1152  *
1153  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1154  * child lookup.
1155  */
1156 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1157                     void *lmm, int lmmsize, struct lookup_intent *it,
1158                     int lookup_flags, struct ptlrpc_request **reqp,
1159                     ldlm_blocking_callback cb_blocking,
1160                     __u64 extra_lock_flags)
1161 {
1162         struct ldlm_enqueue_info einfo = {
1163                 .ei_type        = LDLM_IBITS,
1164                 .ei_mode        = it_to_lock_mode(it),
1165                 .ei_cb_bl       = cb_blocking,
1166                 .ei_cb_cp       = ldlm_completion_ast,
1167         };
1168         struct lustre_handle lockh;
1169         int rc = 0;
1170         ENTRY;
1171         LASSERT(it);
1172
1173         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1174                 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1175                 op_data->op_name, PFID(&op_data->op_fid2),
1176                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1177                 it->it_flags);
1178
1179         lockh.cookie = 0;
1180         if (fid_is_sane(&op_data->op_fid2) &&
1181             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1182                 /* We could just return 1 immediately, but since we should only
1183                  * be called in revalidate_it if we already have a lock, let's
1184                  * verify that. */
1185                 it->d.lustre.it_lock_handle = 0;
1186                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1187                 /* Only return failure if it was not GETATTR by cfid
1188                    (from inode_revalidate) */
1189                 if (rc || op_data->op_namelen != 0)
1190                         RETURN(rc);
1191         }
1192
1193         /* For case if upper layer did not alloc fid, do it now. */
1194         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1195                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1196                 if (rc < 0) {
1197                         CERROR("Can't alloc new fid, rc %d\n", rc);
1198                         RETURN(rc);
1199                 }
1200         }
1201         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1202                          extra_lock_flags);
1203         if (rc < 0)
1204                 RETURN(rc);
1205
1206         *reqp = it->d.lustre.it_data;
1207         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1208         RETURN(rc);
1209 }
1210
1211 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1212                                               struct ptlrpc_request *req,
1213                                               void *args, int rc)
1214 {
1215         struct mdc_getattr_args  *ga = args;
1216         struct obd_export        *exp = ga->ga_exp;
1217         struct md_enqueue_info   *minfo = ga->ga_minfo;
1218         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1219         struct lookup_intent     *it;
1220         struct lustre_handle     *lockh;
1221         struct obd_device        *obddev;
1222         struct ldlm_reply        *lockrep;
1223         __u64                     flags = LDLM_FL_HAS_INTENT;
1224         ENTRY;
1225
1226         it    = &minfo->mi_it;
1227         lockh = &minfo->mi_lockh;
1228
1229         obddev = class_exp2obd(exp);
1230
1231         obd_put_request_slot(&obddev->u.cli);
1232         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1233                 rc = -ETIMEDOUT;
1234
1235         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1236                                    &flags, NULL, 0, lockh, rc);
1237         if (rc < 0) {
1238                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1239                 mdc_clear_replay_flag(req, rc);
1240                 GOTO(out, rc);
1241         }
1242
1243         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1244         LASSERT(lockrep != NULL);
1245
1246         lockrep->lock_policy_res2 =
1247                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1248
1249         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1250         if (rc)
1251                 GOTO(out, rc);
1252
1253         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1254         EXIT;
1255
1256 out:
1257         OBD_FREE_PTR(einfo);
1258         minfo->mi_cb(req, minfo, rc);
1259         return 0;
1260 }
1261
1262 int mdc_intent_getattr_async(struct obd_export *exp,
1263                              struct md_enqueue_info *minfo,
1264                              struct ldlm_enqueue_info *einfo)
1265 {
1266         struct md_op_data       *op_data = &minfo->mi_data;
1267         struct lookup_intent    *it = &minfo->mi_it;
1268         struct ptlrpc_request   *req;
1269         struct mdc_getattr_args *ga;
1270         struct obd_device       *obddev = class_exp2obd(exp);
1271         struct ldlm_res_id       res_id;
1272         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1273          *     for statahead currently. Consider CMD in future, such two bits
1274          *     maybe managed by different MDS, should be adjusted then. */
1275         ldlm_policy_data_t       policy = {
1276                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1277                                                          MDS_INODELOCK_UPDATE }
1278                                  };
1279         int                      rc = 0;
1280         __u64                    flags = LDLM_FL_HAS_INTENT;
1281         ENTRY;
1282
1283         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1284                 LPF64"o\n",
1285                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1286                 ldlm_it2str(it->it_op), it->it_flags);
1287
1288         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1289         req = mdc_intent_getattr_pack(exp, it, op_data);
1290         if (IS_ERR(req))
1291                 RETURN(PTR_ERR(req));
1292
1293         rc = obd_get_request_slot(&obddev->u.cli);
1294         if (rc != 0) {
1295                 ptlrpc_req_finished(req);
1296                 RETURN(rc);
1297         }
1298
1299         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1300                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1301         if (rc < 0) {
1302                 obd_put_request_slot(&obddev->u.cli);
1303                 ptlrpc_req_finished(req);
1304                 RETURN(rc);
1305         }
1306
1307         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1308         ga = ptlrpc_req_async_args(req);
1309         ga->ga_exp = exp;
1310         ga->ga_minfo = minfo;
1311         ga->ga_einfo = einfo;
1312
1313         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1314         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1315
1316         RETURN(0);
1317 }