Whamcloud - gitweb
abc2b25dee7fa6426097f3c7168ecd5b8b3b2b94
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_LEASE)) {
83                 if (phase >= DISP_OPEN_LEASE)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88         if (it_disposition(it, DISP_OPEN_OPEN)) {
89                 if (phase >= DISP_OPEN_OPEN)
90                         return it->d.lustre.it_status;
91                 else
92                         return 0;
93         }
94
95         if (it_disposition(it, DISP_OPEN_CREATE)) {
96                 if (phase >= DISP_OPEN_CREATE)
97                         return it->d.lustre.it_status;
98                 else
99                         return 0;
100         }
101
102         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103                 if (phase >= DISP_LOOKUP_EXECD)
104                         return it->d.lustre.it_status;
105                 else
106                         return 0;
107         }
108
109         if (it_disposition(it, DISP_IT_EXECD)) {
110                 if (phase >= DISP_IT_EXECD)
111                         return it->d.lustre.it_status;
112                 else
113                         return 0;
114         }
115         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116                it->d.lustre.it_status);
117         LBUG();
118         return 0;
119 }
120 EXPORT_SYMBOL(it_open_error);
121
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
124                       __u64 *bits)
125 {
126         struct ldlm_lock *lock;
127         struct inode *new_inode = data;
128         ENTRY;
129
130         if(bits)
131                 *bits = 0;
132
133         if (!*lockh)
134                 RETURN(0);
135
136         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137
138         LASSERT(lock != NULL);
139         lock_res_and_lock(lock);
140 #ifdef __KERNEL__
141         if (lock->l_resource->lr_lvb_inode &&
142             lock->l_resource->lr_lvb_inode != data) {
143                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144                 LASSERTF(old_inode->i_state & I_FREEING,
145                          "Found existing inode %p/%lu/%u state %lu in lock: "
146                          "setting data to %p/%lu/%u\n", old_inode,
147                          old_inode->i_ino, old_inode->i_generation,
148                          old_inode->i_state,
149                          new_inode, new_inode->i_ino, new_inode->i_generation);
150         }
151 #endif
152         lock->l_resource->lr_lvb_inode = new_inode;
153         if (bits)
154                 *bits = lock->l_policy_data.l_inodebits.bits;
155
156         unlock_res_and_lock(lock);
157         LDLM_LOCK_PUT(lock);
158
159         RETURN(0);
160 }
161
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163                            const struct lu_fid *fid, ldlm_type_t type,
164                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
165                            struct lustre_handle *lockh)
166 {
167         struct ldlm_res_id res_id;
168         ldlm_mode_t rc;
169         ENTRY;
170
171         fid_build_reg_res_name(fid, &res_id);
172         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173                              &res_id, type, policy, mode, lockh, 0);
174         RETURN(rc);
175 }
176
177 int mdc_cancel_unused(struct obd_export *exp,
178                       const struct lu_fid *fid,
179                       ldlm_policy_data_t *policy,
180                       ldlm_mode_t mode,
181                       ldlm_cancel_flags_t flags,
182                       void *opaque)
183 {
184         struct ldlm_res_id res_id;
185         struct obd_device *obd = class_exp2obd(exp);
186         int rc;
187
188         ENTRY;
189
190         fid_build_reg_res_name(fid, &res_id);
191         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192                                              policy, mode, flags, opaque);
193         RETURN(rc);
194 }
195
196 int mdc_null_inode(struct obd_export *exp,
197                    const struct lu_fid *fid)
198 {
199         struct ldlm_res_id res_id;
200         struct ldlm_resource *res;
201         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
202         ENTRY;
203
204         LASSERTF(ns != NULL, "no namespace passed\n");
205
206         fid_build_reg_res_name(fid, &res_id);
207
208         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
209         if(res == NULL)
210                 RETURN(0);
211
212         lock_res(res);
213         res->lr_lvb_inode = NULL;
214         unlock_res(res);
215
216         ldlm_resource_putref(res);
217         RETURN(0);
218 }
219
220 /* find any ldlm lock of the inode in mdc
221  * return 0    not find
222  *        1    find one
223  *      < 0    error */
224 int mdc_find_cbdata(struct obd_export *exp,
225                     const struct lu_fid *fid,
226                     ldlm_iterator_t it, void *data)
227 {
228         struct ldlm_res_id res_id;
229         int rc = 0;
230         ENTRY;
231
232         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
234                                    it, data);
235         if (rc == LDLM_ITER_STOP)
236                 RETURN(1);
237         else if (rc == LDLM_ITER_CONTINUE)
238                 RETURN(0);
239         RETURN(rc);
240 }
241
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
243 {
244         /* Don't hold error requests for replay. */
245         if (req->rq_replay) {
246                 spin_lock(&req->rq_lock);
247                 req->rq_replay = 0;
248                 spin_unlock(&req->rq_lock);
249         }
250         if (rc && req->rq_transno != 0) {
251                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
252                 LBUG();
253         }
254 }
255
256 /* Save a large LOV EA into the request buffer so that it is available
257  * for replay.  We don't do this in the initial request because the
258  * original request doesn't need this buffer (at most it sends just the
259  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260  * buffer and may also be difficult to allocate and save a very large
261  * request buffer for each open. (bug 5707)
262  *
263  * OOM here may cause recovery failure if lmm is needed (only for the
264  * original open if the MDS crashed just when this client also OOM'd)
265  * but this is incredibly unlikely, and questionable whether the client
266  * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268                                 struct mdt_body *body)
269 {
270         int     rc;
271
272         /* FIXME: remove this explicit offset. */
273         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
274                                         body->eadatasize);
275         if (rc) {
276                 CERROR("Can't enlarge segment %d size to %d\n",
277                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
278                 body->valid &= ~OBD_MD_FLEASIZE;
279                 body->eadatasize = 0;
280         }
281 }
282
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284                                                    struct lookup_intent *it,
285                                                    struct md_op_data *op_data,
286                                                    void *lmm, int lmmsize,
287                                                    void *cb_data)
288 {
289         struct ptlrpc_request *req;
290         struct obd_device     *obddev = class_exp2obd(exp);
291         struct ldlm_intent    *lit;
292         CFS_LIST_HEAD(cancels);
293         int                    count = 0;
294         int                    mode;
295         int                    rc;
296         ENTRY;
297
298         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
299
300         /* XXX: openlock is not cancelled for cross-refs. */
301         /* If inode is known, cancel conflicting OPEN locks. */
302         if (fid_is_sane(&op_data->op_fid2)) {
303                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304                         if (it->it_flags & FMODE_WRITE)
305                                 mode = LCK_EX;
306                         else
307                                 mode = LCK_PR;
308                 } else {
309                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
310                                 mode = LCK_CW;
311 #ifdef FMODE_EXEC
312                         else if (it->it_flags & FMODE_EXEC)
313                                 mode = LCK_PR;
314 #endif
315                         else
316                                 mode = LCK_CR;
317                 }
318                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
319                                                 &cancels, mode,
320                                                 MDS_INODELOCK_OPEN);
321         }
322
323         /* If CREATE, cancel parent's UPDATE lock. */
324         if (it->it_op & IT_CREAT)
325                 mode = LCK_EX;
326         else
327                 mode = LCK_CR;
328         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
329                                          &cancels, mode,
330                                          MDS_INODELOCK_UPDATE);
331
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333                                    &RQF_LDLM_INTENT_OPEN);
334         if (req == NULL) {
335                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336                 RETURN(ERR_PTR(-ENOMEM));
337         }
338
339         /* parent capability */
340         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341         /* child capability, reserve the size according to parent capa, it will
342          * be filled after we get the reply */
343         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
344
345         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346                              op_data->op_namelen + 1);
347         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
349
350         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 return NULL;
354         }
355
356         spin_lock(&req->rq_lock);
357         req->rq_replay = req->rq_import->imp_replayable;
358         spin_unlock(&req->rq_lock);
359
360         /* pack the intent */
361         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362         lit->opc = (__u64)it->it_op;
363
364         /* pack the intended request */
365         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
366                       lmmsize);
367
368         /* for remote client, fetch remote perm for current user */
369         if (client_is_remote(exp))
370                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371                                      sizeof(struct mdt_remote_perm));
372         ptlrpc_request_set_replen(req);
373         return req;
374 }
375
376 static struct ptlrpc_request *
377 mdc_intent_getxattr_pack(struct obd_export *exp,
378                          struct lookup_intent *it,
379                          struct md_op_data *op_data)
380 {
381         struct ptlrpc_request   *req;
382         struct ldlm_intent      *lit;
383         int                     rc, count = 0, maxdata;
384         CFS_LIST_HEAD(cancels);
385
386         ENTRY;
387
388         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
389                                         &RQF_LDLM_INTENT_GETXATTR);
390         if (req == NULL)
391                 RETURN(ERR_PTR(-ENOMEM));
392
393         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
394
395         if (it->it_op == IT_SETXATTR)
396                 /* If we want to upgrade to LCK_PW, let's cancel LCK_PR
397                  * locks now. This avoids unnecessary ASTs. */
398                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
399                                                 &cancels, LCK_PW,
400                                                 MDS_INODELOCK_XATTR);
401
402         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
403         if (rc) {
404                 ptlrpc_request_free(req);
405                 RETURN(ERR_PTR(rc));
406         }
407
408         /* pack the intent */
409         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
410         lit->opc = IT_GETXATTR;
411
412         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
413
414         /* pack the intended request */
415         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
416                         op_data->op_valid, maxdata, -1, 0);
417
418         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
419                                 RCL_SERVER, maxdata);
420
421         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
422                                 RCL_SERVER, maxdata);
423
424         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
425                                 RCL_SERVER, maxdata);
426
427         ptlrpc_request_set_replen(req);
428
429         RETURN(req);
430 }
431
432 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
433                                                      struct lookup_intent *it,
434                                                      struct md_op_data *op_data)
435 {
436         struct ptlrpc_request *req;
437         struct obd_device     *obddev = class_exp2obd(exp);
438         struct ldlm_intent    *lit;
439         int                    rc;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443                                    &RQF_LDLM_INTENT_UNLINK);
444         if (req == NULL)
445                 RETURN(ERR_PTR(-ENOMEM));
446
447         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
448         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
449                              op_data->op_namelen + 1);
450
451         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 RETURN(ERR_PTR(rc));
455         }
456
457         /* pack the intent */
458         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459         lit->opc = (__u64)it->it_op;
460
461         /* pack the intended request */
462         mdc_unlink_pack(req, op_data);
463
464         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
465                              obddev->u.cli.cl_max_mds_easize);
466         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
467                              obddev->u.cli.cl_max_mds_cookiesize);
468         ptlrpc_request_set_replen(req);
469         RETURN(req);
470 }
471
472 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
473                                                       struct lookup_intent *it,
474                                                       struct md_op_data *op_data)
475 {
476         struct ptlrpc_request *req;
477         struct obd_device     *obddev = class_exp2obd(exp);
478         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
479                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
480                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
481                                        (client_is_remote(exp) ?
482                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
483         struct ldlm_intent    *lit;
484         int                    rc;
485         ENTRY;
486
487         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
488                                    &RQF_LDLM_INTENT_GETATTR);
489         if (req == NULL)
490                 RETURN(ERR_PTR(-ENOMEM));
491
492         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
493         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
494                              op_data->op_namelen + 1);
495
496         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
497         if (rc) {
498                 ptlrpc_request_free(req);
499                 RETURN(ERR_PTR(rc));
500         }
501
502         /* pack the intent */
503         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
504         lit->opc = (__u64)it->it_op;
505
506         /* pack the intended request */
507         mdc_getattr_pack(req, valid, it->it_flags, op_data,
508                          obddev->u.cli.cl_max_mds_easize);
509
510         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
511                              obddev->u.cli.cl_max_mds_easize);
512         if (client_is_remote(exp))
513                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
514                                      sizeof(struct mdt_remote_perm));
515         ptlrpc_request_set_replen(req);
516         RETURN(req);
517 }
518
519 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
520                                                      struct lookup_intent *it,
521                                                      struct md_op_data *unused)
522 {
523         struct obd_device     *obd = class_exp2obd(exp);
524         struct ptlrpc_request *req;
525         struct ldlm_intent    *lit;
526         struct layout_intent  *layout;
527         int rc;
528         ENTRY;
529
530         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
531                                 &RQF_LDLM_INTENT_LAYOUT);
532         if (req == NULL)
533                 RETURN(ERR_PTR(-ENOMEM));
534
535         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
536         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 RETURN(ERR_PTR(rc));
540         }
541
542         /* pack the intent */
543         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
544         lit->opc = (__u64)it->it_op;
545
546         /* pack the layout intent request */
547         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
548         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
549          * set for replication */
550         layout->li_opc = LAYOUT_INTENT_ACCESS;
551
552         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
553                         obd->u.cli.cl_max_mds_easize);
554         ptlrpc_request_set_replen(req);
555         RETURN(req);
556 }
557
558 static struct ptlrpc_request *
559 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
560 {
561         struct ptlrpc_request *req;
562         int rc;
563         ENTRY;
564
565         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
566         if (req == NULL)
567                 RETURN(ERR_PTR(-ENOMEM));
568
569         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
570         if (rc) {
571                 ptlrpc_request_free(req);
572                 RETURN(ERR_PTR(rc));
573         }
574
575         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
576         ptlrpc_request_set_replen(req);
577         RETURN(req);
578 }
579
580 static int mdc_finish_enqueue(struct obd_export *exp,
581                               struct ptlrpc_request *req,
582                               struct ldlm_enqueue_info *einfo,
583                               struct lookup_intent *it,
584                               struct lustre_handle *lockh,
585                               int rc)
586 {
587         struct req_capsule  *pill = &req->rq_pill;
588         struct ldlm_request *lockreq;
589         struct ldlm_reply   *lockrep;
590         struct lustre_intent_data *intent = &it->d.lustre;
591         struct ldlm_lock    *lock;
592         void                *lvb_data = NULL;
593         int                  lvb_len = 0;
594         ENTRY;
595
596         LASSERT(rc >= 0);
597         /* Similarly, if we're going to replay this request, we don't want to
598          * actually get a lock, just perform the intent. */
599         if (req->rq_transno || req->rq_replay) {
600                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
601                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
602         }
603
604         if (rc == ELDLM_LOCK_ABORTED) {
605                 einfo->ei_mode = 0;
606                 memset(lockh, 0, sizeof(*lockh));
607                 rc = 0;
608         } else { /* rc = 0 */
609                 lock = ldlm_handle2lock(lockh);
610                 LASSERT(lock != NULL);
611
612                 /* If the server gave us back a different lock mode, we should
613                  * fix up our variables. */
614                 if (lock->l_req_mode != einfo->ei_mode) {
615                         ldlm_lock_addref(lockh, lock->l_req_mode);
616                         ldlm_lock_decref(lockh, einfo->ei_mode);
617                         einfo->ei_mode = lock->l_req_mode;
618                 }
619                 LDLM_LOCK_PUT(lock);
620         }
621
622         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
623         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
624
625         intent->it_disposition = (int)lockrep->lock_policy_res1;
626         intent->it_status = (int)lockrep->lock_policy_res2;
627         intent->it_lock_mode = einfo->ei_mode;
628         intent->it_lock_handle = lockh->cookie;
629         intent->it_data = req;
630
631         /* Technically speaking rq_transno must already be zero if
632          * it_status is in error, so the check is a bit redundant */
633         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
634                 mdc_clear_replay_flag(req, intent->it_status);
635
636         /* If we're doing an IT_OPEN which did not result in an actual
637          * successful open, then we need to remove the bit which saves
638          * this request for unconditional replay.
639          *
640          * It's important that we do this first!  Otherwise we might exit the
641          * function without doing so, and try to replay a failed create
642          * (bug 3440) */
643         if (it->it_op & IT_OPEN && req->rq_replay &&
644             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
645                 mdc_clear_replay_flag(req, intent->it_status);
646
647         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
648                   it->it_op, intent->it_disposition, intent->it_status);
649
650         /* We know what to expect, so we do any byte flipping required here */
651         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
652                 struct mdt_body *body;
653
654                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
655                 if (body == NULL) {
656                         CERROR ("Can't swab mdt_body\n");
657                         RETURN (-EPROTO);
658                 }
659
660                 if (it_disposition(it, DISP_OPEN_OPEN) &&
661                     !it_open_error(DISP_OPEN_OPEN, it)) {
662                         /*
663                          * If this is a successful OPEN request, we need to set
664                          * replay handler and data early, so that if replay
665                          * happens immediately after swabbing below, new reply
666                          * is swabbed by that handler correctly.
667                          */
668                         mdc_set_open_replay_data(NULL, NULL, req);
669                 }
670
671                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
672                         void *eadata;
673
674                         mdc_update_max_ea_from_body(exp, body);
675
676                         /*
677                          * The eadata is opaque; just check that it is there.
678                          * Eventually, obd_unpackmd() will check the contents.
679                          */
680                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
681                                                               body->eadatasize);
682                         if (eadata == NULL)
683                                 RETURN(-EPROTO);
684
685                         /* save lvb data and length in case this is for layout
686                          * lock */
687                         lvb_data = eadata;
688                         lvb_len = body->eadatasize;
689
690                         /*
691                          * We save the reply LOV EA in case we have to replay a
692                          * create for recovery.  If we didn't allocate a large
693                          * enough request buffer above we need to reallocate it
694                          * here to hold the actual LOV EA.
695                          *
696                          * To not save LOV EA if request is not going to replay
697                          * (for example error one).
698                          */
699                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
700                                 void *lmm;
701                                 if (req_capsule_get_size(pill, &RMF_EADATA,
702                                                          RCL_CLIENT) <
703                                     body->eadatasize)
704                                         mdc_realloc_openmsg(req, body);
705                                 else
706                                         req_capsule_shrink(pill, &RMF_EADATA,
707                                                            body->eadatasize,
708                                                            RCL_CLIENT);
709
710                                 req_capsule_set_size(pill, &RMF_EADATA,
711                                                      RCL_CLIENT,
712                                                      body->eadatasize);
713
714                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
715                                 if (lmm)
716                                         memcpy(lmm, eadata, body->eadatasize);
717                         }
718                 }
719
720                 if (body->valid & OBD_MD_FLRMTPERM) {
721                         struct mdt_remote_perm *perm;
722
723                         LASSERT(client_is_remote(exp));
724                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
725                                                 lustre_swab_mdt_remote_perm);
726                         if (perm == NULL)
727                                 RETURN(-EPROTO);
728                 }
729                 if (body->valid & OBD_MD_FLMDSCAPA) {
730                         struct lustre_capa *capa, *p;
731
732                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
733                         if (capa == NULL)
734                                 RETURN(-EPROTO);
735
736                         if (it->it_op & IT_OPEN) {
737                                 /* client fid capa will be checked in replay */
738                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
739                                 LASSERT(p);
740                                 *p = *capa;
741                         }
742                 }
743                 if (body->valid & OBD_MD_FLOSSCAPA) {
744                         struct lustre_capa *capa;
745
746                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
747                         if (capa == NULL)
748                                 RETURN(-EPROTO);
749                 }
750         } else if (it->it_op & IT_LAYOUT) {
751                 /* maybe the lock was granted right away and layout
752                  * is packed into RMF_DLM_LVB of req */
753                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
754                 if (lvb_len > 0) {
755                         lvb_data = req_capsule_server_sized_get(pill,
756                                                         &RMF_DLM_LVB, lvb_len);
757                         if (lvb_data == NULL)
758                                 RETURN(-EPROTO);
759                 }
760         }
761
762         /* fill in stripe data for layout lock */
763         lock = ldlm_handle2lock(lockh);
764         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
765                 void *lmm;
766
767                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
768                         ldlm_it2str(it->it_op), lvb_len);
769
770                 OBD_ALLOC_LARGE(lmm, lvb_len);
771                 if (lmm == NULL) {
772                         LDLM_LOCK_PUT(lock);
773                         RETURN(-ENOMEM);
774                 }
775                 memcpy(lmm, lvb_data, lvb_len);
776
777                 /* install lvb_data */
778                 lock_res_and_lock(lock);
779                 if (lock->l_lvb_data == NULL) {
780                         lock->l_lvb_data = lmm;
781                         lock->l_lvb_len = lvb_len;
782                         lmm = NULL;
783                 }
784                 unlock_res_and_lock(lock);
785                 if (lmm != NULL)
786                         OBD_FREE_LARGE(lmm, lvb_len);
787         }
788         if (lock != NULL)
789                 LDLM_LOCK_PUT(lock);
790
791         RETURN(rc);
792 }
793
794 /* We always reserve enough space in the reply packet for a stripe MD, because
795  * we don't know in advance the file type. */
796 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
797                 struct lookup_intent *it, struct md_op_data *op_data,
798                 struct lustre_handle *lockh, void *lmm, int lmmsize,
799                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
800 {
801         struct obd_device     *obddev = class_exp2obd(exp);
802         struct ptlrpc_request *req = NULL;
803         __u64                  flags, saved_flags = extra_lock_flags;
804         int                    rc;
805         struct ldlm_res_id res_id;
806         static const ldlm_policy_data_t lookup_policy =
807                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
808         static const ldlm_policy_data_t update_policy =
809                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
810         static const ldlm_policy_data_t layout_policy =
811                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
812         static const ldlm_policy_data_t getxattr_policy = {
813                               .l_inodebits = { MDS_INODELOCK_XATTR } };
814         ldlm_policy_data_t const *policy = &lookup_policy;
815         int                    generation, resends = 0;
816         struct ldlm_reply     *lockrep;
817         enum lvb_type          lvb_type = 0;
818         ENTRY;
819
820         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
821                  einfo->ei_type);
822
823         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
824
825         if (it) {
826                 saved_flags |= LDLM_FL_HAS_INTENT;
827                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
828                         policy = &update_policy;
829                 else if (it->it_op & IT_LAYOUT)
830                         policy = &layout_policy;
831                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
832                         policy = &getxattr_policy;
833         }
834
835         LASSERT(reqp == NULL);
836
837         generation = obddev->u.cli.cl_import->imp_generation;
838 resend:
839         flags = saved_flags;
840         if (!it) {
841                 /* The only way right now is FLOCK, in this case we hide flock
842                    policy as lmm, but lmmsize is 0 */
843                 LASSERT(lmm && lmmsize == 0);
844                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
845                          einfo->ei_type);
846                 policy = (ldlm_policy_data_t *)lmm;
847                 res_id.name[3] = LDLM_FLOCK;
848         } else if (it->it_op & IT_OPEN) {
849                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
850                                            einfo->ei_cbdata);
851                 policy = &update_policy;
852                 einfo->ei_cbdata = NULL;
853                 lmm = NULL;
854         } else if (it->it_op & IT_UNLINK) {
855                 req = mdc_intent_unlink_pack(exp, it, op_data);
856         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
857                 req = mdc_intent_getattr_pack(exp, it, op_data);
858         } else if (it->it_op & IT_READDIR) {
859                 req = mdc_enqueue_pack(exp, 0);
860         } else if (it->it_op & IT_LAYOUT) {
861                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
862                         RETURN(-EOPNOTSUPP);
863                 req = mdc_intent_layout_pack(exp, it, op_data);
864                 lvb_type = LVB_T_LAYOUT;
865         } else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) {
866                 req = mdc_intent_getxattr_pack(exp, it, op_data);
867         } else {
868                 LBUG();
869                 RETURN(-EINVAL);
870         }
871
872         if (IS_ERR(req))
873                 RETURN(PTR_ERR(req));
874
875         if (req != NULL && it && it->it_op & IT_CREAT)
876                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
877                  * retry logic */
878                 req->rq_no_retry_einprogress = 1;
879
880         if (resends) {
881                 req->rq_generation_set = 1;
882                 req->rq_import_generation = generation;
883                 req->rq_sent = cfs_time_current_sec() + resends;
884         }
885
886         /* It is important to obtain rpc_lock first (if applicable), so that
887          * threads that are serialised with rpc_lock are not polluting our
888          * rpcs in flight counter. We do not do flock request limiting, though*/
889         if (it) {
890                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
891                 rc = mdc_enter_request(&obddev->u.cli);
892                 if (rc != 0) {
893                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
894                         mdc_clear_replay_flag(req, 0);
895                         ptlrpc_req_finished(req);
896                         RETURN(rc);
897                 }
898         }
899
900         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
901                               0, lvb_type, lockh, 0);
902         if (!it) {
903                 /* For flock requests we immediatelly return without further
904                    delay and let caller deal with the rest, since rest of
905                    this function metadata processing makes no sense for flock
906                    requests anyway. But in case of problem during comms with
907                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
908                    can not rely on caller and this mainly for F_UNLCKs
909                    (explicits or automatically generated by Kernel to clean
910                    current FLocks upon exit) that can't be trashed */
911                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
912                     (einfo->ei_type == LDLM_FLOCK) &&
913                     (einfo->ei_mode == LCK_NL))
914                         goto resend;
915                 RETURN(rc);
916         }
917
918         mdc_exit_request(&obddev->u.cli);
919         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
920
921         if (rc < 0) {
922                 CERROR("ldlm_cli_enqueue: %d\n", rc);
923                 mdc_clear_replay_flag(req, rc);
924                 ptlrpc_req_finished(req);
925                 RETURN(rc);
926         }
927
928         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
929         LASSERT(lockrep != NULL);
930
931         lockrep->lock_policy_res2 =
932                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
933
934         /* Retry the create infinitely when we get -EINPROGRESS from
935          * server. This is required by the new quota design. */
936         if (it && it->it_op & IT_CREAT &&
937             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
938                 mdc_clear_replay_flag(req, rc);
939                 ptlrpc_req_finished(req);
940                 resends++;
941
942                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
943                        obddev->obd_name, resends, it->it_op,
944                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
945
946                 if (generation == obddev->u.cli.cl_import->imp_generation) {
947                         goto resend;
948                 } else {
949                         CDEBUG(D_HA, "resend cross eviction\n");
950                         RETURN(-EIO);
951                 }
952         }
953
954         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
955         if (rc < 0) {
956                 if (lustre_handle_is_used(lockh)) {
957                         ldlm_lock_decref(lockh, einfo->ei_mode);
958                         memset(lockh, 0, sizeof(*lockh));
959                 }
960                 ptlrpc_req_finished(req);
961         }
962         RETURN(rc);
963 }
964
965 static int mdc_finish_intent_lock(struct obd_export *exp,
966                                   struct ptlrpc_request *request,
967                                   struct md_op_data *op_data,
968                                   struct lookup_intent *it,
969                                   struct lustre_handle *lockh)
970 {
971         struct lustre_handle old_lock;
972         struct mdt_body *mdt_body;
973         struct ldlm_lock *lock;
974         int rc;
975         ENTRY;
976
977         LASSERT(request != NULL);
978         LASSERT(request != LP_POISON);
979         LASSERT(request->rq_repmsg != LP_POISON);
980
981         if (!it_disposition(it, DISP_IT_EXECD)) {
982                 /* The server failed before it even started executing the
983                  * intent, i.e. because it couldn't unpack the request. */
984                 LASSERT(it->d.lustre.it_status != 0);
985                 RETURN(it->d.lustre.it_status);
986         }
987         rc = it_open_error(DISP_IT_EXECD, it);
988         if (rc)
989                 RETURN(rc);
990
991         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
992         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
993
994         /* If we were revalidating a fid/name pair, mark the intent in
995          * case we fail and get called again from lookup */
996         if (fid_is_sane(&op_data->op_fid2) &&
997             it->it_create_mode & M_CHECK_STALE &&
998             it->it_op != IT_GETATTR) {
999                 it_set_disposition(it, DISP_ENQ_COMPLETE);
1000
1001                 /* Also: did we find the same inode? */
1002                 /* sever can return one of two fids:
1003                  * op_fid2 - new allocated fid - if file is created.
1004                  * op_fid3 - existent fid - if file only open.
1005                  * op_fid3 is saved in lmv_intent_open */
1006                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1007                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1008                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1009                                "\n", PFID(&op_data->op_fid2),
1010                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1011                         RETURN(-ESTALE);
1012                 }
1013         }
1014
1015         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1016         if (rc)
1017                 RETURN(rc);
1018
1019         /* keep requests around for the multiple phases of the call
1020          * this shows the DISP_XX must guarantee we make it into the call
1021          */
1022         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1023             it_disposition(it, DISP_OPEN_CREATE) &&
1024             !it_open_error(DISP_OPEN_CREATE, it)) {
1025                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1026                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1027         }
1028         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1029             it_disposition(it, DISP_OPEN_OPEN) &&
1030             !it_open_error(DISP_OPEN_OPEN, it)) {
1031                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1032                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1033                 /* BUG 11546 - eviction in the middle of open rpc processing */
1034                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1035         }
1036
1037         if (it->it_op & IT_CREAT) {
1038                 /* XXX this belongs in ll_create_it */
1039         } else if (it->it_op == IT_OPEN) {
1040                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1041         } else {
1042                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1043         }
1044
1045         /* If we already have a matching lock, then cancel the new
1046          * one.  We have to set the data here instead of in
1047          * mdc_enqueue, because we need to use the child's inode as
1048          * the l_ast_data to match, and that's not available until
1049          * intent_finish has performed the iget().) */
1050         lock = ldlm_handle2lock(lockh);
1051         if (lock) {
1052                 ldlm_policy_data_t policy = lock->l_policy_data;
1053                 LDLM_DEBUG(lock, "matching against this");
1054
1055                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1056                                          &lock->l_resource->lr_name),
1057                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1058                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1059                 LDLM_LOCK_PUT(lock);
1060
1061                 memcpy(&old_lock, lockh, sizeof(*lockh));
1062                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1063                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1064                         ldlm_lock_decref_and_cancel(lockh,
1065                                                     it->d.lustre.it_lock_mode);
1066                         memcpy(lockh, &old_lock, sizeof(old_lock));
1067                         it->d.lustre.it_lock_handle = lockh->cookie;
1068                 }
1069         }
1070         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1071                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1072                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1073         RETURN(rc);
1074 }
1075
1076 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1077                         struct lu_fid *fid, __u64 *bits)
1078 {
1079         /* We could just return 1 immediately, but since we should only
1080          * be called in revalidate_it if we already have a lock, let's
1081          * verify that. */
1082         struct ldlm_res_id res_id;
1083         struct lustre_handle lockh;
1084         ldlm_policy_data_t policy;
1085         ldlm_mode_t mode;
1086         ENTRY;
1087
1088         if (it->d.lustre.it_lock_handle) {
1089                 lockh.cookie = it->d.lustre.it_lock_handle;
1090                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1091         } else {
1092                 fid_build_reg_res_name(fid, &res_id);
1093                 switch (it->it_op) {
1094                 case IT_GETATTR:
1095                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1096                         break;
1097                 case IT_LAYOUT:
1098                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1099                         break;
1100                 default:
1101                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1102                         break;
1103                 }
1104                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1105                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1106                                        LDLM_IBITS, &policy,
1107                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1108         }
1109
1110         if (mode) {
1111                 it->d.lustre.it_lock_handle = lockh.cookie;
1112                 it->d.lustre.it_lock_mode = mode;
1113         } else {
1114                 it->d.lustre.it_lock_handle = 0;
1115                 it->d.lustre.it_lock_mode = 0;
1116         }
1117
1118         RETURN(!!mode);
1119 }
1120
1121 /*
1122  * This long block is all about fixing up the lock and request state
1123  * so that it is correct as of the moment _before_ the operation was
1124  * applied; that way, the VFS will think that everything is normal and
1125  * call Lustre's regular VFS methods.
1126  *
1127  * If we're performing a creation, that means that unless the creation
1128  * failed with EEXIST, we should fake up a negative dentry.
1129  *
1130  * For everything else, we want to lookup to succeed.
1131  *
1132  * One additional note: if CREATE or OPEN succeeded, we add an extra
1133  * reference to the request because we need to keep it around until
1134  * ll_create/ll_open gets called.
1135  *
1136  * The server will return to us, in it_disposition, an indication of
1137  * exactly what d.lustre.it_status refers to.
1138  *
1139  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1140  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1141  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1142  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1143  * was successful.
1144  *
1145  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1146  * child lookup.
1147  */
1148 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1149                     void *lmm, int lmmsize, struct lookup_intent *it,
1150                     int lookup_flags, struct ptlrpc_request **reqp,
1151                     ldlm_blocking_callback cb_blocking,
1152                     __u64 extra_lock_flags)
1153 {
1154         struct lustre_handle lockh;
1155         int rc = 0;
1156         ENTRY;
1157         LASSERT(it);
1158
1159         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1160                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1161                 op_data->op_name, PFID(&op_data->op_fid2),
1162                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1163                 it->it_flags);
1164
1165         lockh.cookie = 0;
1166         if (fid_is_sane(&op_data->op_fid2) &&
1167             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1168                 /* We could just return 1 immediately, but since we should only
1169                  * be called in revalidate_it if we already have a lock, let's
1170                  * verify that. */
1171                 it->d.lustre.it_lock_handle = 0;
1172                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1173                 /* Only return failure if it was not GETATTR by cfid
1174                    (from inode_revalidate) */
1175                 if (rc || op_data->op_namelen != 0)
1176                         RETURN(rc);
1177         }
1178
1179         /* lookup_it may be called only after revalidate_it has run, because
1180          * revalidate_it cannot return errors, only zero.  Returning zero causes
1181          * this call to lookup, which *can* return an error.
1182          *
1183          * We only want to execute the request associated with the intent one
1184          * time, however, so don't send the request again.  Instead, skip past
1185          * this and use the request from revalidate.  In this case, revalidate
1186          * never dropped its reference, so the refcounts are all OK */
1187         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1188                 struct ldlm_enqueue_info einfo = {
1189                         .ei_type        = LDLM_IBITS,
1190                         .ei_mode        = it_to_lock_mode(it),
1191                         .ei_cb_bl       = cb_blocking,
1192                         .ei_cb_cp       = ldlm_completion_ast,
1193                 };
1194
1195                 /* For case if upper layer did not alloc fid, do it now. */
1196                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1197                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1198                         if (rc < 0) {
1199                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1200                                 RETURN(rc);
1201                         }
1202                 }
1203                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1204                                  lmm, lmmsize, NULL, extra_lock_flags);
1205                 if (rc < 0)
1206                         RETURN(rc);
1207         } else if (!fid_is_sane(&op_data->op_fid2) ||
1208                    !(it->it_create_mode & M_CHECK_STALE)) {
1209                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1210                  * request referenced from this intent, saved for subsequent
1211                  * lookup.  This path is executed when we proceed to this
1212                  * lookup, so we clear DISP_ENQ_COMPLETE */
1213                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1214         }
1215         *reqp = it->d.lustre.it_data;
1216         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1217         RETURN(rc);
1218 }
1219
1220 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1221                                               struct ptlrpc_request *req,
1222                                               void *args, int rc)
1223 {
1224         struct mdc_getattr_args  *ga = args;
1225         struct obd_export        *exp = ga->ga_exp;
1226         struct md_enqueue_info   *minfo = ga->ga_minfo;
1227         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1228         struct lookup_intent     *it;
1229         struct lustre_handle     *lockh;
1230         struct obd_device        *obddev;
1231         struct ldlm_reply        *lockrep;
1232         __u64                     flags = LDLM_FL_HAS_INTENT;
1233         ENTRY;
1234
1235         it    = &minfo->mi_it;
1236         lockh = &minfo->mi_lockh;
1237
1238         obddev = class_exp2obd(exp);
1239
1240         mdc_exit_request(&obddev->u.cli);
1241         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1242                 rc = -ETIMEDOUT;
1243
1244         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1245                                    &flags, NULL, 0, lockh, rc);
1246         if (rc < 0) {
1247                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1248                 mdc_clear_replay_flag(req, rc);
1249                 GOTO(out, rc);
1250         }
1251
1252         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1253         LASSERT(lockrep != NULL);
1254
1255         lockrep->lock_policy_res2 =
1256                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1257
1258         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1259         if (rc)
1260                 GOTO(out, rc);
1261
1262         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1263         EXIT;
1264
1265 out:
1266         OBD_FREE_PTR(einfo);
1267         minfo->mi_cb(req, minfo, rc);
1268         return 0;
1269 }
1270
1271 int mdc_intent_getattr_async(struct obd_export *exp,
1272                              struct md_enqueue_info *minfo,
1273                              struct ldlm_enqueue_info *einfo)
1274 {
1275         struct md_op_data       *op_data = &minfo->mi_data;
1276         struct lookup_intent    *it = &minfo->mi_it;
1277         struct ptlrpc_request   *req;
1278         struct mdc_getattr_args *ga;
1279         struct obd_device       *obddev = class_exp2obd(exp);
1280         struct ldlm_res_id       res_id;
1281         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1282          *     for statahead currently. Consider CMD in future, such two bits
1283          *     maybe managed by different MDS, should be adjusted then. */
1284         ldlm_policy_data_t       policy = {
1285                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1286                                                          MDS_INODELOCK_UPDATE }
1287                                  };
1288         int                      rc = 0;
1289         __u64                    flags = LDLM_FL_HAS_INTENT;
1290         ENTRY;
1291
1292         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1293                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1294                 ldlm_it2str(it->it_op), it->it_flags);
1295
1296         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1297         req = mdc_intent_getattr_pack(exp, it, op_data);
1298         if (!req)
1299                 RETURN(-ENOMEM);
1300
1301         rc = mdc_enter_request(&obddev->u.cli);
1302         if (rc != 0) {
1303                 ptlrpc_req_finished(req);
1304                 RETURN(rc);
1305         }
1306
1307         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1308                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1309         if (rc < 0) {
1310                 mdc_exit_request(&obddev->u.cli);
1311                 ptlrpc_req_finished(req);
1312                 RETURN(rc);
1313         }
1314
1315         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1316         ga = ptlrpc_req_async_args(req);
1317         ga->ga_exp = exp;
1318         ga->ga_minfo = minfo;
1319         ga->ga_einfo = einfo;
1320
1321         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1322         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1323
1324         RETURN(0);
1325 }