Whamcloud - gitweb
1a71afb0044c4b09b7f30b3b0f8f7d8388d89d80
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_LEASE)) {
83                 if (phase >= DISP_OPEN_LEASE)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88         if (it_disposition(it, DISP_OPEN_OPEN)) {
89                 if (phase >= DISP_OPEN_OPEN)
90                         return it->d.lustre.it_status;
91                 else
92                         return 0;
93         }
94
95         if (it_disposition(it, DISP_OPEN_CREATE)) {
96                 if (phase >= DISP_OPEN_CREATE)
97                         return it->d.lustre.it_status;
98                 else
99                         return 0;
100         }
101
102         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103                 if (phase >= DISP_LOOKUP_EXECD)
104                         return it->d.lustre.it_status;
105                 else
106                         return 0;
107         }
108
109         if (it_disposition(it, DISP_IT_EXECD)) {
110                 if (phase >= DISP_IT_EXECD)
111                         return it->d.lustre.it_status;
112                 else
113                         return 0;
114         }
115         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116                it->d.lustre.it_status);
117         LBUG();
118         return 0;
119 }
120 EXPORT_SYMBOL(it_open_error);
121
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
124                       __u64 *bits)
125 {
126         struct ldlm_lock *lock;
127         struct inode *new_inode = data;
128         ENTRY;
129
130         if(bits)
131                 *bits = 0;
132
133         if (!*lockh)
134                 RETURN(0);
135
136         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137
138         LASSERT(lock != NULL);
139         lock_res_and_lock(lock);
140 #ifdef __KERNEL__
141         if (lock->l_resource->lr_lvb_inode &&
142             lock->l_resource->lr_lvb_inode != data) {
143                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144                 LASSERTF(old_inode->i_state & I_FREEING,
145                          "Found existing inode %p/%lu/%u state %lu in lock: "
146                          "setting data to %p/%lu/%u\n", old_inode,
147                          old_inode->i_ino, old_inode->i_generation,
148                          old_inode->i_state,
149                          new_inode, new_inode->i_ino, new_inode->i_generation);
150         }
151 #endif
152         lock->l_resource->lr_lvb_inode = new_inode;
153         if (bits)
154                 *bits = lock->l_policy_data.l_inodebits.bits;
155
156         unlock_res_and_lock(lock);
157         LDLM_LOCK_PUT(lock);
158
159         RETURN(0);
160 }
161
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163                            const struct lu_fid *fid, ldlm_type_t type,
164                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
165                            struct lustre_handle *lockh)
166 {
167         struct ldlm_res_id res_id;
168         ldlm_mode_t rc;
169         ENTRY;
170
171         fid_build_reg_res_name(fid, &res_id);
172         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173                              &res_id, type, policy, mode, lockh, 0);
174         RETURN(rc);
175 }
176
177 int mdc_cancel_unused(struct obd_export *exp,
178                       const struct lu_fid *fid,
179                       ldlm_policy_data_t *policy,
180                       ldlm_mode_t mode,
181                       ldlm_cancel_flags_t flags,
182                       void *opaque)
183 {
184         struct ldlm_res_id res_id;
185         struct obd_device *obd = class_exp2obd(exp);
186         int rc;
187
188         ENTRY;
189
190         fid_build_reg_res_name(fid, &res_id);
191         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192                                              policy, mode, flags, opaque);
193         RETURN(rc);
194 }
195
196 int mdc_null_inode(struct obd_export *exp,
197                    const struct lu_fid *fid)
198 {
199         struct ldlm_res_id res_id;
200         struct ldlm_resource *res;
201         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
202         ENTRY;
203
204         LASSERTF(ns != NULL, "no namespace passed\n");
205
206         fid_build_reg_res_name(fid, &res_id);
207
208         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
209         if(res == NULL)
210                 RETURN(0);
211
212         lock_res(res);
213         res->lr_lvb_inode = NULL;
214         unlock_res(res);
215
216         ldlm_resource_putref(res);
217         RETURN(0);
218 }
219
220 /* find any ldlm lock of the inode in mdc
221  * return 0    not find
222  *        1    find one
223  *      < 0    error */
224 int mdc_find_cbdata(struct obd_export *exp,
225                     const struct lu_fid *fid,
226                     ldlm_iterator_t it, void *data)
227 {
228         struct ldlm_res_id res_id;
229         int rc = 0;
230         ENTRY;
231
232         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
234                                    it, data);
235         if (rc == LDLM_ITER_STOP)
236                 RETURN(1);
237         else if (rc == LDLM_ITER_CONTINUE)
238                 RETURN(0);
239         RETURN(rc);
240 }
241
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
243 {
244         /* Don't hold error requests for replay. */
245         if (req->rq_replay) {
246                 spin_lock(&req->rq_lock);
247                 req->rq_replay = 0;
248                 spin_unlock(&req->rq_lock);
249         }
250         if (rc && req->rq_transno != 0) {
251                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
252                 LBUG();
253         }
254 }
255
256 /* Save a large LOV EA into the request buffer so that it is available
257  * for replay.  We don't do this in the initial request because the
258  * original request doesn't need this buffer (at most it sends just the
259  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260  * buffer and may also be difficult to allocate and save a very large
261  * request buffer for each open. (bug 5707)
262  *
263  * OOM here may cause recovery failure if lmm is needed (only for the
264  * original open if the MDS crashed just when this client also OOM'd)
265  * but this is incredibly unlikely, and questionable whether the client
266  * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268                                 struct mdt_body *body)
269 {
270         int     rc;
271
272         /* FIXME: remove this explicit offset. */
273         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
274                                         body->eadatasize);
275         if (rc) {
276                 CERROR("Can't enlarge segment %d size to %d\n",
277                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
278                 body->valid &= ~OBD_MD_FLEASIZE;
279                 body->eadatasize = 0;
280         }
281 }
282
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284                                                    struct lookup_intent *it,
285                                                    struct md_op_data *op_data,
286                                                    void *lmm, int lmmsize,
287                                                    void *cb_data)
288 {
289         struct ptlrpc_request *req;
290         struct obd_device     *obddev = class_exp2obd(exp);
291         struct ldlm_intent    *lit;
292         CFS_LIST_HEAD(cancels);
293         int                    count = 0;
294         int                    mode;
295         int                    rc;
296         ENTRY;
297
298         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
299
300         /* XXX: openlock is not cancelled for cross-refs. */
301         /* If inode is known, cancel conflicting OPEN locks. */
302         if (fid_is_sane(&op_data->op_fid2)) {
303                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304                         if (it->it_flags & FMODE_WRITE)
305                                 mode = LCK_EX;
306                         else
307                                 mode = LCK_PR;
308                 } else {
309                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
310                                 mode = LCK_CW;
311 #ifdef FMODE_EXEC
312                         else if (it->it_flags & FMODE_EXEC)
313                                 mode = LCK_PR;
314 #endif
315                         else
316                                 mode = LCK_CR;
317                 }
318                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
319                                                 &cancels, mode,
320                                                 MDS_INODELOCK_OPEN);
321         }
322
323         /* If CREATE, cancel parent's UPDATE lock. */
324         if (it->it_op & IT_CREAT)
325                 mode = LCK_EX;
326         else
327                 mode = LCK_CR;
328         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
329                                          &cancels, mode,
330                                          MDS_INODELOCK_UPDATE);
331
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333                                    &RQF_LDLM_INTENT_OPEN);
334         if (req == NULL) {
335                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336                 RETURN(ERR_PTR(-ENOMEM));
337         }
338
339         /* parent capability */
340         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341         /* child capability, reserve the size according to parent capa, it will
342          * be filled after we get the reply */
343         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
344
345         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346                              op_data->op_namelen + 1);
347         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
349
350         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 return NULL;
354         }
355
356         spin_lock(&req->rq_lock);
357         req->rq_replay = req->rq_import->imp_replayable;
358         spin_unlock(&req->rq_lock);
359
360         /* pack the intent */
361         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362         lit->opc = (__u64)it->it_op;
363
364         /* pack the intended request */
365         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
366                       lmmsize);
367
368         /* for remote client, fetch remote perm for current user */
369         if (client_is_remote(exp))
370                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371                                      sizeof(struct mdt_remote_perm));
372         ptlrpc_request_set_replen(req);
373         return req;
374 }
375
376 static struct ptlrpc_request *
377 mdc_intent_getxattr_pack(struct obd_export *exp,
378                          struct lookup_intent *it,
379                          struct md_op_data *op_data)
380 {
381         struct ptlrpc_request   *req;
382         struct ldlm_intent      *lit;
383         int                     rc, count = 0, maxdata;
384         CFS_LIST_HEAD(cancels);
385
386         ENTRY;
387
388         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
389                                         &RQF_LDLM_INTENT_GETXATTR);
390         if (req == NULL)
391                 RETURN(ERR_PTR(-ENOMEM));
392
393         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
394
395         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = IT_GETXATTR;
404
405         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
406
407         /* pack the intended request */
408         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
409                         op_data->op_valid, maxdata, -1, 0);
410
411         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
412                                 RCL_SERVER, maxdata);
413
414         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
415                                 RCL_SERVER, maxdata);
416
417         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
418                                 RCL_SERVER, maxdata);
419
420         ptlrpc_request_set_replen(req);
421
422         RETURN(req);
423 }
424
425 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
426                                                      struct lookup_intent *it,
427                                                      struct md_op_data *op_data)
428 {
429         struct ptlrpc_request *req;
430         struct obd_device     *obddev = class_exp2obd(exp);
431         struct ldlm_intent    *lit;
432         int                    rc;
433         ENTRY;
434
435         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
436                                    &RQF_LDLM_INTENT_UNLINK);
437         if (req == NULL)
438                 RETURN(ERR_PTR(-ENOMEM));
439
440         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
441         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
442                              op_data->op_namelen + 1);
443
444         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
445         if (rc) {
446                 ptlrpc_request_free(req);
447                 RETURN(ERR_PTR(rc));
448         }
449
450         /* pack the intent */
451         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
452         lit->opc = (__u64)it->it_op;
453
454         /* pack the intended request */
455         mdc_unlink_pack(req, op_data);
456
457         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
458                              obddev->u.cli.cl_max_mds_easize);
459         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
460                              obddev->u.cli.cl_max_mds_cookiesize);
461         ptlrpc_request_set_replen(req);
462         RETURN(req);
463 }
464
465 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
466                                                       struct lookup_intent *it,
467                                                       struct md_op_data *op_data)
468 {
469         struct ptlrpc_request *req;
470         struct obd_device     *obddev = class_exp2obd(exp);
471         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
472                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
473                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
474                                        (client_is_remote(exp) ?
475                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
476         struct ldlm_intent    *lit;
477         int                    rc;
478         ENTRY;
479
480         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
481                                    &RQF_LDLM_INTENT_GETATTR);
482         if (req == NULL)
483                 RETURN(ERR_PTR(-ENOMEM));
484
485         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
486         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
487                              op_data->op_namelen + 1);
488
489         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
490         if (rc) {
491                 ptlrpc_request_free(req);
492                 RETURN(ERR_PTR(rc));
493         }
494
495         /* pack the intent */
496         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
497         lit->opc = (__u64)it->it_op;
498
499         /* pack the intended request */
500         mdc_getattr_pack(req, valid, it->it_flags, op_data,
501                          obddev->u.cli.cl_max_mds_easize);
502
503         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
504                              obddev->u.cli.cl_max_mds_easize);
505         if (client_is_remote(exp))
506                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
507                                      sizeof(struct mdt_remote_perm));
508         ptlrpc_request_set_replen(req);
509         RETURN(req);
510 }
511
512 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
513                                                      struct lookup_intent *it,
514                                                      struct md_op_data *unused)
515 {
516         struct obd_device     *obd = class_exp2obd(exp);
517         struct ptlrpc_request *req;
518         struct ldlm_intent    *lit;
519         struct layout_intent  *layout;
520         int rc;
521         ENTRY;
522
523         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
524                                 &RQF_LDLM_INTENT_LAYOUT);
525         if (req == NULL)
526                 RETURN(ERR_PTR(-ENOMEM));
527
528         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
529         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
530         if (rc) {
531                 ptlrpc_request_free(req);
532                 RETURN(ERR_PTR(rc));
533         }
534
535         /* pack the intent */
536         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
537         lit->opc = (__u64)it->it_op;
538
539         /* pack the layout intent request */
540         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
541         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
542          * set for replication */
543         layout->li_opc = LAYOUT_INTENT_ACCESS;
544
545         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
546                         obd->u.cli.cl_max_mds_easize);
547         ptlrpc_request_set_replen(req);
548         RETURN(req);
549 }
550
551 static struct ptlrpc_request *
552 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
553 {
554         struct ptlrpc_request *req;
555         int rc;
556         ENTRY;
557
558         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
559         if (req == NULL)
560                 RETURN(ERR_PTR(-ENOMEM));
561
562         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
563         if (rc) {
564                 ptlrpc_request_free(req);
565                 RETURN(ERR_PTR(rc));
566         }
567
568         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
569         ptlrpc_request_set_replen(req);
570         RETURN(req);
571 }
572
573 static int mdc_finish_enqueue(struct obd_export *exp,
574                               struct ptlrpc_request *req,
575                               struct ldlm_enqueue_info *einfo,
576                               struct lookup_intent *it,
577                               struct lustre_handle *lockh,
578                               int rc)
579 {
580         struct req_capsule  *pill = &req->rq_pill;
581         struct ldlm_request *lockreq;
582         struct ldlm_reply   *lockrep;
583         struct lustre_intent_data *intent = &it->d.lustre;
584         struct ldlm_lock    *lock;
585         void                *lvb_data = NULL;
586         int                  lvb_len = 0;
587         ENTRY;
588
589         LASSERT(rc >= 0);
590         /* Similarly, if we're going to replay this request, we don't want to
591          * actually get a lock, just perform the intent. */
592         if (req->rq_transno || req->rq_replay) {
593                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
594                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
595         }
596
597         if (rc == ELDLM_LOCK_ABORTED) {
598                 einfo->ei_mode = 0;
599                 memset(lockh, 0, sizeof(*lockh));
600                 rc = 0;
601         } else { /* rc = 0 */
602                 lock = ldlm_handle2lock(lockh);
603                 LASSERT(lock != NULL);
604
605                 /* If the server gave us back a different lock mode, we should
606                  * fix up our variables. */
607                 if (lock->l_req_mode != einfo->ei_mode) {
608                         ldlm_lock_addref(lockh, lock->l_req_mode);
609                         ldlm_lock_decref(lockh, einfo->ei_mode);
610                         einfo->ei_mode = lock->l_req_mode;
611                 }
612                 LDLM_LOCK_PUT(lock);
613         }
614
615         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
616         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
617
618         intent->it_disposition = (int)lockrep->lock_policy_res1;
619         intent->it_status = (int)lockrep->lock_policy_res2;
620         intent->it_lock_mode = einfo->ei_mode;
621         intent->it_lock_handle = lockh->cookie;
622         intent->it_data = req;
623
624         /* Technically speaking rq_transno must already be zero if
625          * it_status is in error, so the check is a bit redundant */
626         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
627                 mdc_clear_replay_flag(req, intent->it_status);
628
629         /* If we're doing an IT_OPEN which did not result in an actual
630          * successful open, then we need to remove the bit which saves
631          * this request for unconditional replay.
632          *
633          * It's important that we do this first!  Otherwise we might exit the
634          * function without doing so, and try to replay a failed create
635          * (bug 3440) */
636         if (it->it_op & IT_OPEN && req->rq_replay &&
637             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
638                 mdc_clear_replay_flag(req, intent->it_status);
639
640         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
641                   it->it_op, intent->it_disposition, intent->it_status);
642
643         /* We know what to expect, so we do any byte flipping required here */
644         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
645                 struct mdt_body *body;
646
647                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
648                 if (body == NULL) {
649                         CERROR ("Can't swab mdt_body\n");
650                         RETURN (-EPROTO);
651                 }
652
653                 if (it_disposition(it, DISP_OPEN_OPEN) &&
654                     !it_open_error(DISP_OPEN_OPEN, it)) {
655                         /*
656                          * If this is a successful OPEN request, we need to set
657                          * replay handler and data early, so that if replay
658                          * happens immediately after swabbing below, new reply
659                          * is swabbed by that handler correctly.
660                          */
661                         mdc_set_open_replay_data(NULL, NULL, req);
662                 }
663
664                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
665                         void *eadata;
666
667                         mdc_update_max_ea_from_body(exp, body);
668
669                         /*
670                          * The eadata is opaque; just check that it is there.
671                          * Eventually, obd_unpackmd() will check the contents.
672                          */
673                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
674                                                               body->eadatasize);
675                         if (eadata == NULL)
676                                 RETURN(-EPROTO);
677
678                         /* save lvb data and length in case this is for layout
679                          * lock */
680                         lvb_data = eadata;
681                         lvb_len = body->eadatasize;
682
683                         /*
684                          * We save the reply LOV EA in case we have to replay a
685                          * create for recovery.  If we didn't allocate a large
686                          * enough request buffer above we need to reallocate it
687                          * here to hold the actual LOV EA.
688                          *
689                          * To not save LOV EA if request is not going to replay
690                          * (for example error one).
691                          */
692                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
693                                 void *lmm;
694                                 if (req_capsule_get_size(pill, &RMF_EADATA,
695                                                          RCL_CLIENT) <
696                                     body->eadatasize)
697                                         mdc_realloc_openmsg(req, body);
698                                 else
699                                         req_capsule_shrink(pill, &RMF_EADATA,
700                                                            body->eadatasize,
701                                                            RCL_CLIENT);
702
703                                 req_capsule_set_size(pill, &RMF_EADATA,
704                                                      RCL_CLIENT,
705                                                      body->eadatasize);
706
707                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
708                                 if (lmm)
709                                         memcpy(lmm, eadata, body->eadatasize);
710                         }
711                 }
712
713                 if (body->valid & OBD_MD_FLRMTPERM) {
714                         struct mdt_remote_perm *perm;
715
716                         LASSERT(client_is_remote(exp));
717                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
718                                                 lustre_swab_mdt_remote_perm);
719                         if (perm == NULL)
720                                 RETURN(-EPROTO);
721                 }
722                 if (body->valid & OBD_MD_FLMDSCAPA) {
723                         struct lustre_capa *capa, *p;
724
725                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
726                         if (capa == NULL)
727                                 RETURN(-EPROTO);
728
729                         if (it->it_op & IT_OPEN) {
730                                 /* client fid capa will be checked in replay */
731                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
732                                 LASSERT(p);
733                                 *p = *capa;
734                         }
735                 }
736                 if (body->valid & OBD_MD_FLOSSCAPA) {
737                         struct lustre_capa *capa;
738
739                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
740                         if (capa == NULL)
741                                 RETURN(-EPROTO);
742                 }
743         } else if (it->it_op & IT_LAYOUT) {
744                 /* maybe the lock was granted right away and layout
745                  * is packed into RMF_DLM_LVB of req */
746                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
747                 if (lvb_len > 0) {
748                         lvb_data = req_capsule_server_sized_get(pill,
749                                                         &RMF_DLM_LVB, lvb_len);
750                         if (lvb_data == NULL)
751                                 RETURN(-EPROTO);
752                 }
753         }
754
755         /* fill in stripe data for layout lock */
756         lock = ldlm_handle2lock(lockh);
757         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
758                 void *lmm;
759
760                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
761                         ldlm_it2str(it->it_op), lvb_len);
762
763                 OBD_ALLOC_LARGE(lmm, lvb_len);
764                 if (lmm == NULL) {
765                         LDLM_LOCK_PUT(lock);
766                         RETURN(-ENOMEM);
767                 }
768                 memcpy(lmm, lvb_data, lvb_len);
769
770                 /* install lvb_data */
771                 lock_res_and_lock(lock);
772                 if (lock->l_lvb_data == NULL) {
773                         lock->l_lvb_data = lmm;
774                         lock->l_lvb_len = lvb_len;
775                         lmm = NULL;
776                 }
777                 unlock_res_and_lock(lock);
778                 if (lmm != NULL)
779                         OBD_FREE_LARGE(lmm, lvb_len);
780         }
781         if (lock != NULL)
782                 LDLM_LOCK_PUT(lock);
783
784         RETURN(rc);
785 }
786
787 /* We always reserve enough space in the reply packet for a stripe MD, because
788  * we don't know in advance the file type. */
789 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
790                 struct lookup_intent *it, struct md_op_data *op_data,
791                 struct lustre_handle *lockh, void *lmm, int lmmsize,
792                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
793 {
794         struct obd_device     *obddev = class_exp2obd(exp);
795         struct ptlrpc_request *req = NULL;
796         __u64                  flags, saved_flags = extra_lock_flags;
797         int                    rc;
798         struct ldlm_res_id res_id;
799         static const ldlm_policy_data_t lookup_policy =
800                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
801         static const ldlm_policy_data_t update_policy =
802                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
803         static const ldlm_policy_data_t layout_policy =
804                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
805         static const ldlm_policy_data_t getxattr_policy = {
806                               .l_inodebits = { MDS_INODELOCK_XATTR } };
807         ldlm_policy_data_t const *policy = &lookup_policy;
808         int                    generation, resends = 0;
809         struct ldlm_reply     *lockrep;
810         enum lvb_type          lvb_type = 0;
811         ENTRY;
812
813         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
814                  einfo->ei_type);
815
816         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
817
818         if (it) {
819                 saved_flags |= LDLM_FL_HAS_INTENT;
820                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
821                         policy = &update_policy;
822                 else if (it->it_op & IT_LAYOUT)
823                         policy = &layout_policy;
824                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
825                         policy = &getxattr_policy;
826         }
827
828         LASSERT(reqp == NULL);
829
830         generation = obddev->u.cli.cl_import->imp_generation;
831 resend:
832         flags = saved_flags;
833         if (!it) {
834                 /* The only way right now is FLOCK, in this case we hide flock
835                    policy as lmm, but lmmsize is 0 */
836                 LASSERT(lmm && lmmsize == 0);
837                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
838                          einfo->ei_type);
839                 policy = (ldlm_policy_data_t *)lmm;
840                 res_id.name[3] = LDLM_FLOCK;
841         } else if (it->it_op & IT_OPEN) {
842                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
843                                            einfo->ei_cbdata);
844                 policy = &update_policy;
845                 einfo->ei_cbdata = NULL;
846                 lmm = NULL;
847         } else if (it->it_op & IT_UNLINK) {
848                 req = mdc_intent_unlink_pack(exp, it, op_data);
849         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
850                 req = mdc_intent_getattr_pack(exp, it, op_data);
851         } else if (it->it_op & IT_READDIR) {
852                 req = mdc_enqueue_pack(exp, 0);
853         } else if (it->it_op & IT_LAYOUT) {
854                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
855                         RETURN(-EOPNOTSUPP);
856                 req = mdc_intent_layout_pack(exp, it, op_data);
857                 lvb_type = LVB_T_LAYOUT;
858         } else if (it->it_op & IT_GETXATTR) {
859                 req = mdc_intent_getxattr_pack(exp, it, op_data);
860         } else {
861                 LBUG();
862                 RETURN(-EINVAL);
863         }
864
865         if (IS_ERR(req))
866                 RETURN(PTR_ERR(req));
867
868         if (req != NULL && it && it->it_op & IT_CREAT)
869                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
870                  * retry logic */
871                 req->rq_no_retry_einprogress = 1;
872
873         if (resends) {
874                 req->rq_generation_set = 1;
875                 req->rq_import_generation = generation;
876                 req->rq_sent = cfs_time_current_sec() + resends;
877         }
878
879         /* It is important to obtain rpc_lock first (if applicable), so that
880          * threads that are serialised with rpc_lock are not polluting our
881          * rpcs in flight counter. We do not do flock request limiting, though*/
882         if (it) {
883                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
884                 rc = mdc_enter_request(&obddev->u.cli);
885                 if (rc != 0) {
886                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
887                         mdc_clear_replay_flag(req, 0);
888                         ptlrpc_req_finished(req);
889                         RETURN(rc);
890                 }
891         }
892
893         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
894                               0, lvb_type, lockh, 0);
895         if (!it) {
896                 /* For flock requests we immediatelly return without further
897                    delay and let caller deal with the rest, since rest of
898                    this function metadata processing makes no sense for flock
899                    requests anyway. But in case of problem during comms with
900                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
901                    can not rely on caller and this mainly for F_UNLCKs
902                    (explicits or automatically generated by Kernel to clean
903                    current FLocks upon exit) that can't be trashed */
904                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
905                     (einfo->ei_type == LDLM_FLOCK) &&
906                     (einfo->ei_mode == LCK_NL))
907                         goto resend;
908                 RETURN(rc);
909         }
910
911         mdc_exit_request(&obddev->u.cli);
912         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
913
914         if (rc < 0) {
915                 CERROR("ldlm_cli_enqueue: %d\n", rc);
916                 mdc_clear_replay_flag(req, rc);
917                 ptlrpc_req_finished(req);
918                 RETURN(rc);
919         }
920
921         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
922         LASSERT(lockrep != NULL);
923
924         lockrep->lock_policy_res2 =
925                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
926
927         /* Retry the create infinitely when we get -EINPROGRESS from
928          * server. This is required by the new quota design. */
929         if (it && it->it_op & IT_CREAT &&
930             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
931                 mdc_clear_replay_flag(req, rc);
932                 ptlrpc_req_finished(req);
933                 resends++;
934
935                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
936                        obddev->obd_name, resends, it->it_op,
937                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
938
939                 if (generation == obddev->u.cli.cl_import->imp_generation) {
940                         goto resend;
941                 } else {
942                         CDEBUG(D_HA, "resend cross eviction\n");
943                         RETURN(-EIO);
944                 }
945         }
946
947         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
948         if (rc < 0) {
949                 if (lustre_handle_is_used(lockh)) {
950                         ldlm_lock_decref(lockh, einfo->ei_mode);
951                         memset(lockh, 0, sizeof(*lockh));
952                 }
953                 ptlrpc_req_finished(req);
954         }
955         RETURN(rc);
956 }
957
958 static int mdc_finish_intent_lock(struct obd_export *exp,
959                                   struct ptlrpc_request *request,
960                                   struct md_op_data *op_data,
961                                   struct lookup_intent *it,
962                                   struct lustre_handle *lockh)
963 {
964         struct lustre_handle old_lock;
965         struct mdt_body *mdt_body;
966         struct ldlm_lock *lock;
967         int rc;
968         ENTRY;
969
970         LASSERT(request != NULL);
971         LASSERT(request != LP_POISON);
972         LASSERT(request->rq_repmsg != LP_POISON);
973
974         if (!it_disposition(it, DISP_IT_EXECD)) {
975                 /* The server failed before it even started executing the
976                  * intent, i.e. because it couldn't unpack the request. */
977                 LASSERT(it->d.lustre.it_status != 0);
978                 RETURN(it->d.lustre.it_status);
979         }
980         rc = it_open_error(DISP_IT_EXECD, it);
981         if (rc)
982                 RETURN(rc);
983
984         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
985         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
986
987         /* If we were revalidating a fid/name pair, mark the intent in
988          * case we fail and get called again from lookup */
989         if (fid_is_sane(&op_data->op_fid2) &&
990             it->it_create_mode & M_CHECK_STALE &&
991             it->it_op != IT_GETATTR) {
992                 it_set_disposition(it, DISP_ENQ_COMPLETE);
993
994                 /* Also: did we find the same inode? */
995                 /* sever can return one of two fids:
996                  * op_fid2 - new allocated fid - if file is created.
997                  * op_fid3 - existent fid - if file only open.
998                  * op_fid3 is saved in lmv_intent_open */
999                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1000                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1001                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1002                                "\n", PFID(&op_data->op_fid2),
1003                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1004                         RETURN(-ESTALE);
1005                 }
1006         }
1007
1008         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1009         if (rc)
1010                 RETURN(rc);
1011
1012         /* keep requests around for the multiple phases of the call
1013          * this shows the DISP_XX must guarantee we make it into the call
1014          */
1015         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1016             it_disposition(it, DISP_OPEN_CREATE) &&
1017             !it_open_error(DISP_OPEN_CREATE, it)) {
1018                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1019                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1020         }
1021         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1022             it_disposition(it, DISP_OPEN_OPEN) &&
1023             !it_open_error(DISP_OPEN_OPEN, it)) {
1024                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1025                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1026                 /* BUG 11546 - eviction in the middle of open rpc processing */
1027                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1028         }
1029
1030         if (it->it_op & IT_CREAT) {
1031                 /* XXX this belongs in ll_create_it */
1032         } else if (it->it_op == IT_OPEN) {
1033                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1034         } else {
1035                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1036         }
1037
1038         /* If we already have a matching lock, then cancel the new
1039          * one.  We have to set the data here instead of in
1040          * mdc_enqueue, because we need to use the child's inode as
1041          * the l_ast_data to match, and that's not available until
1042          * intent_finish has performed the iget().) */
1043         lock = ldlm_handle2lock(lockh);
1044         if (lock) {
1045                 ldlm_policy_data_t policy = lock->l_policy_data;
1046                 LDLM_DEBUG(lock, "matching against this");
1047
1048                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1049                                          &lock->l_resource->lr_name),
1050                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1051                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1052                 LDLM_LOCK_PUT(lock);
1053
1054                 memcpy(&old_lock, lockh, sizeof(*lockh));
1055                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1056                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1057                         ldlm_lock_decref_and_cancel(lockh,
1058                                                     it->d.lustre.it_lock_mode);
1059                         memcpy(lockh, &old_lock, sizeof(old_lock));
1060                         it->d.lustre.it_lock_handle = lockh->cookie;
1061                 }
1062         }
1063         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1064                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1065                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1066         RETURN(rc);
1067 }
1068
1069 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1070                         struct lu_fid *fid, __u64 *bits)
1071 {
1072         /* We could just return 1 immediately, but since we should only
1073          * be called in revalidate_it if we already have a lock, let's
1074          * verify that. */
1075         struct ldlm_res_id res_id;
1076         struct lustre_handle lockh;
1077         ldlm_policy_data_t policy;
1078         ldlm_mode_t mode;
1079         ENTRY;
1080
1081         if (it->d.lustre.it_lock_handle) {
1082                 lockh.cookie = it->d.lustre.it_lock_handle;
1083                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1084         } else {
1085                 fid_build_reg_res_name(fid, &res_id);
1086                 switch (it->it_op) {
1087                 case IT_GETATTR:
1088                         /* File attributes are held under multiple bits:
1089                          * nlink is under lookup lock, size and times are
1090                          * under UPDATE lock and recently we've also got
1091                          * a separate permissions lock for owner/group/acl that
1092                          * were protected by lookup lock before.
1093                          * Getattr must provide all of that information,
1094                          * so we need to ensure we have all of those locks.
1095                          * Unfortunately, if the bits are split across multiple
1096                          * locks, there's no easy way to match all of them here,
1097                          * so an extra RPC would be performed to fetch all
1098                          * of those bits at once for now. */
1099                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100                                                   MDS_INODELOCK_LOOKUP |
1101                                                   MDS_INODELOCK_PERM;
1102                         break;
1103                 case IT_LAYOUT:
1104                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1105                         break;
1106                 default:
1107                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108                         break;
1109                 }
1110
1111                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1112                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1113                                        LDLM_IBITS, &policy,
1114                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1115         }
1116
1117         if (mode) {
1118                 it->d.lustre.it_lock_handle = lockh.cookie;
1119                 it->d.lustre.it_lock_mode = mode;
1120         } else {
1121                 it->d.lustre.it_lock_handle = 0;
1122                 it->d.lustre.it_lock_mode = 0;
1123         }
1124
1125         RETURN(!!mode);
1126 }
1127
1128 /*
1129  * This long block is all about fixing up the lock and request state
1130  * so that it is correct as of the moment _before_ the operation was
1131  * applied; that way, the VFS will think that everything is normal and
1132  * call Lustre's regular VFS methods.
1133  *
1134  * If we're performing a creation, that means that unless the creation
1135  * failed with EEXIST, we should fake up a negative dentry.
1136  *
1137  * For everything else, we want to lookup to succeed.
1138  *
1139  * One additional note: if CREATE or OPEN succeeded, we add an extra
1140  * reference to the request because we need to keep it around until
1141  * ll_create/ll_open gets called.
1142  *
1143  * The server will return to us, in it_disposition, an indication of
1144  * exactly what d.lustre.it_status refers to.
1145  *
1146  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1147  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1148  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1149  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1150  * was successful.
1151  *
1152  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1153  * child lookup.
1154  */
1155 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1156                     void *lmm, int lmmsize, struct lookup_intent *it,
1157                     int lookup_flags, struct ptlrpc_request **reqp,
1158                     ldlm_blocking_callback cb_blocking,
1159                     __u64 extra_lock_flags)
1160 {
1161         struct lustre_handle lockh;
1162         int rc = 0;
1163         ENTRY;
1164         LASSERT(it);
1165
1166         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1167                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1168                 op_data->op_name, PFID(&op_data->op_fid2),
1169                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1170                 it->it_flags);
1171
1172         lockh.cookie = 0;
1173         if (fid_is_sane(&op_data->op_fid2) &&
1174             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1175                 /* We could just return 1 immediately, but since we should only
1176                  * be called in revalidate_it if we already have a lock, let's
1177                  * verify that. */
1178                 it->d.lustre.it_lock_handle = 0;
1179                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1180                 /* Only return failure if it was not GETATTR by cfid
1181                    (from inode_revalidate) */
1182                 if (rc || op_data->op_namelen != 0)
1183                         RETURN(rc);
1184         }
1185
1186         /* lookup_it may be called only after revalidate_it has run, because
1187          * revalidate_it cannot return errors, only zero.  Returning zero causes
1188          * this call to lookup, which *can* return an error.
1189          *
1190          * We only want to execute the request associated with the intent one
1191          * time, however, so don't send the request again.  Instead, skip past
1192          * this and use the request from revalidate.  In this case, revalidate
1193          * never dropped its reference, so the refcounts are all OK */
1194         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1195                 struct ldlm_enqueue_info einfo = {
1196                         .ei_type        = LDLM_IBITS,
1197                         .ei_mode        = it_to_lock_mode(it),
1198                         .ei_cb_bl       = cb_blocking,
1199                         .ei_cb_cp       = ldlm_completion_ast,
1200                 };
1201
1202                 /* For case if upper layer did not alloc fid, do it now. */
1203                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1204                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1205                         if (rc < 0) {
1206                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1207                                 RETURN(rc);
1208                         }
1209                 }
1210                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1211                                  lmm, lmmsize, NULL, extra_lock_flags);
1212                 if (rc < 0)
1213                         RETURN(rc);
1214         } else if (!fid_is_sane(&op_data->op_fid2) ||
1215                    !(it->it_create_mode & M_CHECK_STALE)) {
1216                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1217                  * request referenced from this intent, saved for subsequent
1218                  * lookup.  This path is executed when we proceed to this
1219                  * lookup, so we clear DISP_ENQ_COMPLETE */
1220                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1221         }
1222         *reqp = it->d.lustre.it_data;
1223         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1224         RETURN(rc);
1225 }
1226
1227 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1228                                               struct ptlrpc_request *req,
1229                                               void *args, int rc)
1230 {
1231         struct mdc_getattr_args  *ga = args;
1232         struct obd_export        *exp = ga->ga_exp;
1233         struct md_enqueue_info   *minfo = ga->ga_minfo;
1234         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1235         struct lookup_intent     *it;
1236         struct lustre_handle     *lockh;
1237         struct obd_device        *obddev;
1238         struct ldlm_reply        *lockrep;
1239         __u64                     flags = LDLM_FL_HAS_INTENT;
1240         ENTRY;
1241
1242         it    = &minfo->mi_it;
1243         lockh = &minfo->mi_lockh;
1244
1245         obddev = class_exp2obd(exp);
1246
1247         mdc_exit_request(&obddev->u.cli);
1248         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1249                 rc = -ETIMEDOUT;
1250
1251         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1252                                    &flags, NULL, 0, lockh, rc);
1253         if (rc < 0) {
1254                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1255                 mdc_clear_replay_flag(req, rc);
1256                 GOTO(out, rc);
1257         }
1258
1259         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1260         LASSERT(lockrep != NULL);
1261
1262         lockrep->lock_policy_res2 =
1263                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1264
1265         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1266         if (rc)
1267                 GOTO(out, rc);
1268
1269         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1270         EXIT;
1271
1272 out:
1273         OBD_FREE_PTR(einfo);
1274         minfo->mi_cb(req, minfo, rc);
1275         return 0;
1276 }
1277
1278 int mdc_intent_getattr_async(struct obd_export *exp,
1279                              struct md_enqueue_info *minfo,
1280                              struct ldlm_enqueue_info *einfo)
1281 {
1282         struct md_op_data       *op_data = &minfo->mi_data;
1283         struct lookup_intent    *it = &minfo->mi_it;
1284         struct ptlrpc_request   *req;
1285         struct mdc_getattr_args *ga;
1286         struct obd_device       *obddev = class_exp2obd(exp);
1287         struct ldlm_res_id       res_id;
1288         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1289          *     for statahead currently. Consider CMD in future, such two bits
1290          *     maybe managed by different MDS, should be adjusted then. */
1291         ldlm_policy_data_t       policy = {
1292                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1293                                                          MDS_INODELOCK_UPDATE }
1294                                  };
1295         int                      rc = 0;
1296         __u64                    flags = LDLM_FL_HAS_INTENT;
1297         ENTRY;
1298
1299         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1300                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1301                 ldlm_it2str(it->it_op), it->it_flags);
1302
1303         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1304         req = mdc_intent_getattr_pack(exp, it, op_data);
1305         if (!req)
1306                 RETURN(-ENOMEM);
1307
1308         rc = mdc_enter_request(&obddev->u.cli);
1309         if (rc != 0) {
1310                 ptlrpc_req_finished(req);
1311                 RETURN(rc);
1312         }
1313
1314         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1315                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1316         if (rc < 0) {
1317                 mdc_exit_request(&obddev->u.cli);
1318                 ptlrpc_req_finished(req);
1319                 RETURN(rc);
1320         }
1321
1322         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1323         ga = ptlrpc_req_async_args(req);
1324         ga->ga_exp = exp;
1325         ga->ga_minfo = minfo;
1326         ga->ga_einfo = einfo;
1327
1328         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1329         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1330
1331         RETURN(0);
1332 }