Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40 #include <obd.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export           *ga_exp;
52         struct md_enqueue_info      *ga_minfo;
53         struct ldlm_enqueue_info    *ga_einfo;
54 };
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_LEASE)) {
59                 if (phase >= DISP_OPEN_LEASE)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
100                       __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!*lockh)
110                 RETURN(0);
111
112         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137                            const struct lu_fid *fid, ldlm_type_t type,
138                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
139                            struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         ldlm_mode_t rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp,
154                       const struct lu_fid *fid,
155                       ldlm_policy_data_t *policy,
156                       ldlm_mode_t mode,
157                       ldlm_cancel_flags_t flags,
158                       void *opaque)
159 {
160         struct ldlm_res_id res_id;
161         struct obd_device *obd = class_exp2obd(exp);
162         int rc;
163
164         ENTRY;
165
166         fid_build_reg_res_name(fid, &res_id);
167         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168                                              policy, mode, flags, opaque);
169         RETURN(rc);
170 }
171
172 int mdc_null_inode(struct obd_export *exp,
173                    const struct lu_fid *fid)
174 {
175         struct ldlm_res_id res_id;
176         struct ldlm_resource *res;
177         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
178         ENTRY;
179
180         LASSERTF(ns != NULL, "no namespace passed\n");
181
182         fid_build_reg_res_name(fid, &res_id);
183
184         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185         if (IS_ERR(res))
186                 RETURN(0);
187
188         lock_res(res);
189         res->lr_lvb_inode = NULL;
190         unlock_res(res);
191
192         ldlm_resource_putref(res);
193         RETURN(0);
194 }
195
196 /* find any ldlm lock of the inode in mdc
197  * return 0    not find
198  *        1    find one
199  *      < 0    error */
200 int mdc_find_cbdata(struct obd_export *exp,
201                     const struct lu_fid *fid,
202                     ldlm_iterator_t it, void *data)
203 {
204         struct ldlm_res_id res_id;
205         int rc = 0;
206         ENTRY;
207
208         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
210                                    it, data);
211         if (rc == LDLM_ITER_STOP)
212                 RETURN(1);
213         else if (rc == LDLM_ITER_CONTINUE)
214                 RETURN(0);
215         RETURN(rc);
216 }
217
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
219 {
220         /* Don't hold error requests for replay. */
221         if (req->rq_replay) {
222                 spin_lock(&req->rq_lock);
223                 req->rq_replay = 0;
224                 spin_unlock(&req->rq_lock);
225         }
226         if (rc && req->rq_transno != 0) {
227                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
228                 LBUG();
229         }
230 }
231
232 /* Save a large LOV EA into the request buffer so that it is available
233  * for replay.  We don't do this in the initial request because the
234  * original request doesn't need this buffer (at most it sends just the
235  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236  * buffer and may also be difficult to allocate and save a very large
237  * request buffer for each open. (bug 5707)
238  *
239  * OOM here may cause recovery failure if lmm is needed (only for the
240  * original open if the MDS crashed just when this client also OOM'd)
241  * but this is incredibly unlikely, and questionable whether the client
242  * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244                                 struct mdt_body *body)
245 {
246         int     rc;
247
248         /* FIXME: remove this explicit offset. */
249         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250                                         body->mbo_eadatasize);
251         if (rc) {
252                 CERROR("Can't enlarge segment %d size to %d\n",
253                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255                 body->mbo_eadatasize = 0;
256         }
257 }
258
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261                      struct md_op_data *op_data)
262 {
263         struct ptlrpc_request   *req;
264         struct obd_device       *obddev = class_exp2obd(exp);
265         struct ldlm_intent      *lit;
266         const void              *lmm = op_data->op_data;
267         __u32                    lmmsize = op_data->op_data_size;
268         struct list_head         cancels = LIST_HEAD_INIT(cancels);
269         int                      count = 0;
270         int                      mode;
271         int                      rc;
272         ENTRY;
273
274         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
275
276         /* XXX: openlock is not cancelled for cross-refs. */
277         /* If inode is known, cancel conflicting OPEN locks. */
278         if (fid_is_sane(&op_data->op_fid2)) {
279                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280                         if (it->it_flags & FMODE_WRITE)
281                                 mode = LCK_EX;
282                         else
283                                 mode = LCK_PR;
284                 } else {
285                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
286                                 mode = LCK_CW;
287 #ifdef FMODE_EXEC
288                         else if (it->it_flags & FMODE_EXEC)
289                                 mode = LCK_PR;
290 #endif
291                         else
292                                 mode = LCK_CR;
293                 }
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         /* parent capability */
316         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317         /* child capability, reserve the size according to parent capa, it will
318          * be filled after we get the reply */
319         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
320
321         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322                              op_data->op_namelen + 1);
323         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
325
326         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
327         if (rc < 0) {
328                 ptlrpc_request_free(req);
329                 RETURN(ERR_PTR(rc));
330         }
331
332         spin_lock(&req->rq_lock);
333         req->rq_replay = req->rq_import->imp_replayable;
334         spin_unlock(&req->rq_lock);
335
336         /* pack the intent */
337         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338         lit->opc = (__u64)it->it_op;
339
340         /* pack the intended request */
341         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342                       lmmsize);
343
344         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
345                              obddev->u.cli.cl_max_mds_easize);
346
347         /* for remote client, fetch remote perm for current user */
348         if (client_is_remote(exp))
349                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
350                                      sizeof(struct mdt_remote_perm));
351         ptlrpc_request_set_replen(req);
352         return req;
353 }
354
355 static struct ptlrpc_request *
356 mdc_intent_getxattr_pack(struct obd_export *exp,
357                          struct lookup_intent *it,
358                          struct md_op_data *op_data)
359 {
360         struct ptlrpc_request   *req;
361         struct ldlm_intent      *lit;
362         int                     rc, count = 0;
363         __u32                   maxdata;
364         struct list_head        cancels = LIST_HEAD_INIT(cancels);
365
366         ENTRY;
367
368         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369                                         &RQF_LDLM_INTENT_GETXATTR);
370         if (req == NULL)
371                 RETURN(ERR_PTR(-ENOMEM));
372
373         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374
375         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
376         if (rc) {
377                 ptlrpc_request_free(req);
378                 RETURN(ERR_PTR(rc));
379         }
380
381         /* pack the intent */
382         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
383         lit->opc = IT_GETXATTR;
384
385         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
386
387         /* pack the intended request */
388         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
389                         op_data->op_valid, maxdata, -1, 0);
390
391         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
392                                 RCL_SERVER, maxdata);
393
394         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
395                                 RCL_SERVER, maxdata);
396
397         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
398                                 RCL_SERVER, maxdata);
399
400         ptlrpc_request_set_replen(req);
401
402         RETURN(req);
403 }
404
405 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
406                                                      struct lookup_intent *it,
407                                                      struct md_op_data *op_data)
408 {
409         struct ptlrpc_request *req;
410         struct obd_device     *obddev = class_exp2obd(exp);
411         struct ldlm_intent    *lit;
412         int                    rc;
413         ENTRY;
414
415         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
416                                    &RQF_LDLM_INTENT_UNLINK);
417         if (req == NULL)
418                 RETURN(ERR_PTR(-ENOMEM));
419
420         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
421         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
422                              op_data->op_namelen + 1);
423
424         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
425         if (rc) {
426                 ptlrpc_request_free(req);
427                 RETURN(ERR_PTR(rc));
428         }
429
430         /* pack the intent */
431         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
432         lit->opc = (__u64)it->it_op;
433
434         /* pack the intended request */
435         mdc_unlink_pack(req, op_data);
436
437         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
438                              obddev->u.cli.cl_default_mds_easize);
439         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440                              obddev->u.cli.cl_default_mds_cookiesize);
441         ptlrpc_request_set_replen(req);
442         RETURN(req);
443 }
444
445 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
446                                                       struct lookup_intent *it,
447                                                       struct md_op_data *op_data)
448 {
449         struct ptlrpc_request   *req;
450         struct obd_device       *obddev = class_exp2obd(exp);
451         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
452                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
453                                          OBD_MD_FLMDSCAPA | OBD_MD_MEA |
454                                          (client_is_remote(exp) ?
455                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
456         struct ldlm_intent      *lit;
457         int                      rc;
458         __u32                    easize;
459         ENTRY;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_LDLM_INTENT_GETATTR);
463         if (req == NULL)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
467         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
468                              op_data->op_namelen + 1);
469
470         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
471         if (rc) {
472                 ptlrpc_request_free(req);
473                 RETURN(ERR_PTR(rc));
474         }
475
476         /* pack the intent */
477         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
478         lit->opc = (__u64)it->it_op;
479
480         if (obddev->u.cli.cl_default_mds_easize > 0)
481                 easize = obddev->u.cli.cl_default_mds_easize;
482         else
483                 easize = obddev->u.cli.cl_max_mds_easize;
484
485         /* pack the intended request */
486         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
487
488         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
489         if (client_is_remote(exp))
490                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
491                                      sizeof(struct mdt_remote_perm));
492         ptlrpc_request_set_replen(req);
493         RETURN(req);
494 }
495
496 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
497                                                      struct lookup_intent *it,
498                                                      struct md_op_data *unused)
499 {
500         struct obd_device     *obd = class_exp2obd(exp);
501         struct ptlrpc_request *req;
502         struct ldlm_intent    *lit;
503         struct layout_intent  *layout;
504         int rc;
505         ENTRY;
506
507         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
508                                 &RQF_LDLM_INTENT_LAYOUT);
509         if (req == NULL)
510                 RETURN(ERR_PTR(-ENOMEM));
511
512         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
513         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(ERR_PTR(rc));
517         }
518
519         /* pack the intent */
520         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
521         lit->opc = (__u64)it->it_op;
522
523         /* pack the layout intent request */
524         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
525         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
526          * set for replication */
527         layout->li_opc = LAYOUT_INTENT_ACCESS;
528
529         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
530                              obd->u.cli.cl_default_mds_easize);
531         ptlrpc_request_set_replen(req);
532         RETURN(req);
533 }
534
535 static struct ptlrpc_request *
536 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
537 {
538         struct ptlrpc_request *req;
539         int rc;
540         ENTRY;
541
542         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
543         if (req == NULL)
544                 RETURN(ERR_PTR(-ENOMEM));
545
546         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
547         if (rc) {
548                 ptlrpc_request_free(req);
549                 RETURN(ERR_PTR(rc));
550         }
551
552         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
553         ptlrpc_request_set_replen(req);
554         RETURN(req);
555 }
556
557 static int mdc_finish_enqueue(struct obd_export *exp,
558                               struct ptlrpc_request *req,
559                               struct ldlm_enqueue_info *einfo,
560                               struct lookup_intent *it,
561                               struct lustre_handle *lockh,
562                               int rc)
563 {
564         struct req_capsule  *pill = &req->rq_pill;
565         struct ldlm_request *lockreq;
566         struct ldlm_reply   *lockrep;
567         struct lustre_intent_data *intent = &it->d.lustre;
568         struct ldlm_lock    *lock;
569         void                *lvb_data = NULL;
570         __u32                lvb_len = 0;
571         ENTRY;
572
573         LASSERT(rc >= 0);
574         /* Similarly, if we're going to replay this request, we don't want to
575          * actually get a lock, just perform the intent. */
576         if (req->rq_transno || req->rq_replay) {
577                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
578                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
579         }
580
581         if (rc == ELDLM_LOCK_ABORTED) {
582                 einfo->ei_mode = 0;
583                 memset(lockh, 0, sizeof(*lockh));
584                 rc = 0;
585         } else { /* rc = 0 */
586                 lock = ldlm_handle2lock(lockh);
587                 LASSERT(lock != NULL);
588
589                 /* If the server gave us back a different lock mode, we should
590                  * fix up our variables. */
591                 if (lock->l_req_mode != einfo->ei_mode) {
592                         ldlm_lock_addref(lockh, lock->l_req_mode);
593                         ldlm_lock_decref(lockh, einfo->ei_mode);
594                         einfo->ei_mode = lock->l_req_mode;
595                 }
596                 LDLM_LOCK_PUT(lock);
597         }
598
599         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
600         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
601
602         intent->it_disposition = (int)lockrep->lock_policy_res1;
603         intent->it_status = (int)lockrep->lock_policy_res2;
604         intent->it_lock_mode = einfo->ei_mode;
605         intent->it_lock_handle = lockh->cookie;
606         intent->it_data = req;
607
608         /* Technically speaking rq_transno must already be zero if
609          * it_status is in error, so the check is a bit redundant */
610         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
611                 mdc_clear_replay_flag(req, intent->it_status);
612
613         /* If we're doing an IT_OPEN which did not result in an actual
614          * successful open, then we need to remove the bit which saves
615          * this request for unconditional replay.
616          *
617          * It's important that we do this first!  Otherwise we might exit the
618          * function without doing so, and try to replay a failed create
619          * (bug 3440) */
620         if (it->it_op & IT_OPEN && req->rq_replay &&
621             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
622                 mdc_clear_replay_flag(req, intent->it_status);
623
624         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
625                   it->it_op, intent->it_disposition, intent->it_status);
626
627         /* We know what to expect, so we do any byte flipping required here */
628         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
629                 struct mdt_body *body;
630
631                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
632                 if (body == NULL) {
633                         CERROR ("Can't swab mdt_body\n");
634                         RETURN (-EPROTO);
635                 }
636
637                 if (it_disposition(it, DISP_OPEN_OPEN) &&
638                     !it_open_error(DISP_OPEN_OPEN, it)) {
639                         /*
640                          * If this is a successful OPEN request, we need to set
641                          * replay handler and data early, so that if replay
642                          * happens immediately after swabbing below, new reply
643                          * is swabbed by that handler correctly.
644                          */
645                         mdc_set_open_replay_data(NULL, NULL, it);
646                 }
647
648                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
649                         void *eadata;
650
651                         mdc_update_max_ea_from_body(exp, body);
652
653                         /*
654                          * The eadata is opaque; just check that it is there.
655                          * Eventually, obd_unpackmd() will check the contents.
656                          */
657                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
658                                                         body->mbo_eadatasize);
659                         if (eadata == NULL)
660                                 RETURN(-EPROTO);
661
662                         /* save lvb data and length in case this is for layout
663                          * lock */
664                         lvb_data = eadata;
665                         lvb_len = body->mbo_eadatasize;
666
667                         /*
668                          * We save the reply LOV EA in case we have to replay a
669                          * create for recovery.  If we didn't allocate a large
670                          * enough request buffer above we need to reallocate it
671                          * here to hold the actual LOV EA.
672                          *
673                          * To not save LOV EA if request is not going to replay
674                          * (for example error one).
675                          */
676                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
677                                 void *lmm;
678                                 if (req_capsule_get_size(pill, &RMF_EADATA,
679                                                          RCL_CLIENT) <
680                                     body->mbo_eadatasize)
681                                         mdc_realloc_openmsg(req, body);
682                                 else
683                                         req_capsule_shrink(pill, &RMF_EADATA,
684                                                            body->mbo_eadatasize,
685                                                            RCL_CLIENT);
686
687                                 req_capsule_set_size(pill, &RMF_EADATA,
688                                                      RCL_CLIENT,
689                                                      body->mbo_eadatasize);
690
691                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
692                                 if (lmm)
693                                         memcpy(lmm, eadata,
694                                                body->mbo_eadatasize);
695                         }
696                 }
697
698                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
699                         struct mdt_remote_perm *perm;
700
701                         LASSERT(client_is_remote(exp));
702                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
703                                                 lustre_swab_mdt_remote_perm);
704                         if (perm == NULL)
705                                 RETURN(-EPROTO);
706                 }
707                 if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
708                         struct lustre_capa *capa, *p;
709
710                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
711                         if (capa == NULL)
712                                 RETURN(-EPROTO);
713
714                         if (it->it_op & IT_OPEN) {
715                                 /* client fid capa will be checked in replay */
716                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
717                                 LASSERT(p);
718                                 *p = *capa;
719                         }
720                 }
721                 if (body->mbo_valid & OBD_MD_FLOSSCAPA) {
722                         struct lustre_capa *capa;
723
724                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
725                         if (capa == NULL)
726                                 RETURN(-EPROTO);
727                 }
728         } else if (it->it_op & IT_LAYOUT) {
729                 /* maybe the lock was granted right away and layout
730                  * is packed into RMF_DLM_LVB of req */
731                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
732                 if (lvb_len > 0) {
733                         lvb_data = req_capsule_server_sized_get(pill,
734                                                         &RMF_DLM_LVB, lvb_len);
735                         if (lvb_data == NULL)
736                                 RETURN(-EPROTO);
737                 }
738         }
739
740         /* fill in stripe data for layout lock */
741         lock = ldlm_handle2lock(lockh);
742         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
743                 void *lmm;
744
745                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
746                         ldlm_it2str(it->it_op), lvb_len);
747
748                 OBD_ALLOC_LARGE(lmm, lvb_len);
749                 if (lmm == NULL) {
750                         LDLM_LOCK_PUT(lock);
751                         RETURN(-ENOMEM);
752                 }
753                 memcpy(lmm, lvb_data, lvb_len);
754
755                 /* install lvb_data */
756                 lock_res_and_lock(lock);
757                 if (lock->l_lvb_data == NULL) {
758                         lock->l_lvb_type = LVB_T_LAYOUT;
759                         lock->l_lvb_data = lmm;
760                         lock->l_lvb_len = lvb_len;
761                         lmm = NULL;
762                 }
763                 unlock_res_and_lock(lock);
764                 if (lmm != NULL)
765                         OBD_FREE_LARGE(lmm, lvb_len);
766         }
767         if (lock != NULL)
768                 LDLM_LOCK_PUT(lock);
769
770         RETURN(rc);
771 }
772
773 /* We always reserve enough space in the reply packet for a stripe MD, because
774  * we don't know in advance the file type. */
775 int mdc_enqueue(struct obd_export *exp,
776                 struct ldlm_enqueue_info *einfo,
777                 const union ldlm_policy_data *policy,
778                 struct lookup_intent *it, struct md_op_data *op_data,
779                 struct lustre_handle *lockh, __u64 extra_lock_flags)
780 {
781         struct obd_device     *obddev = class_exp2obd(exp);
782         struct ptlrpc_request *req = NULL;
783         __u64                  flags, saved_flags = extra_lock_flags;
784         int                    rc;
785         struct ldlm_res_id res_id;
786         static const ldlm_policy_data_t lookup_policy =
787                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
788         static const ldlm_policy_data_t update_policy =
789                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
790         static const ldlm_policy_data_t layout_policy =
791                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
792         static const ldlm_policy_data_t getxattr_policy = {
793                               .l_inodebits = { MDS_INODELOCK_XATTR } };
794         int                    generation, resends = 0;
795         struct ldlm_reply     *lockrep;
796         enum lvb_type          lvb_type = 0;
797         ENTRY;
798
799         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
800                  einfo->ei_type);
801         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
802
803         if (it != NULL) {
804                 LASSERT(policy == NULL);
805
806                 saved_flags |= LDLM_FL_HAS_INTENT;
807                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
808                         policy = &update_policy;
809                 else if (it->it_op & IT_LAYOUT)
810                         policy = &layout_policy;
811                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812                         policy = &getxattr_policy;
813                 else
814                         policy = &lookup_policy;
815         }
816
817         generation = obddev->u.cli.cl_import->imp_generation;
818 resend:
819         flags = saved_flags;
820         if (it == NULL) {
821                 /* The only way right now is FLOCK. */
822                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
823                          einfo->ei_type);
824                 res_id.name[3] = LDLM_FLOCK;
825         } else if (it->it_op & IT_OPEN) {
826                 req = mdc_intent_open_pack(exp, it, op_data);
827         } else if (it->it_op & IT_UNLINK) {
828                 req = mdc_intent_unlink_pack(exp, it, op_data);
829         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830                 req = mdc_intent_getattr_pack(exp, it, op_data);
831         } else if (it->it_op & IT_READDIR) {
832                 req = mdc_enqueue_pack(exp, 0);
833         } else if (it->it_op & IT_LAYOUT) {
834                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
835                         RETURN(-EOPNOTSUPP);
836                 req = mdc_intent_layout_pack(exp, it, op_data);
837                 lvb_type = LVB_T_LAYOUT;
838         } else if (it->it_op & IT_GETXATTR) {
839                 req = mdc_intent_getxattr_pack(exp, it, op_data);
840         } else {
841                 LBUG();
842                 RETURN(-EINVAL);
843         }
844
845         if (IS_ERR(req))
846                 RETURN(PTR_ERR(req));
847
848         if (req != NULL && it && it->it_op & IT_CREAT)
849                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
850                  * retry logic */
851                 req->rq_no_retry_einprogress = 1;
852
853         if (resends) {
854                 req->rq_generation_set = 1;
855                 req->rq_import_generation = generation;
856                 req->rq_sent = cfs_time_current_sec() + resends;
857         }
858
859         /* It is important to obtain rpc_lock first (if applicable), so that
860          * threads that are serialised with rpc_lock are not polluting our
861          * rpcs in flight counter. We do not do flock request limiting, though*/
862         if (it) {
863                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
864                 rc = obd_get_request_slot(&obddev->u.cli);
865                 if (rc != 0) {
866                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867                         mdc_clear_replay_flag(req, 0);
868                         ptlrpc_req_finished(req);
869                         RETURN(rc);
870                 }
871         }
872
873         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
874                               0, lvb_type, lockh, 0);
875         if (!it) {
876                 /* For flock requests we immediatelly return without further
877                    delay and let caller deal with the rest, since rest of
878                    this function metadata processing makes no sense for flock
879                    requests anyway. But in case of problem during comms with
880                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
881                    can not rely on caller and this mainly for F_UNLCKs
882                    (explicits or automatically generated by Kernel to clean
883                    current FLocks upon exit) that can't be trashed */
884                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
885                     (einfo->ei_type == LDLM_FLOCK) &&
886                     (einfo->ei_mode == LCK_NL))
887                         goto resend;
888                 RETURN(rc);
889         }
890
891         obd_put_request_slot(&obddev->u.cli);
892         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
893
894         if (rc < 0) {
895                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
896                        obddev->obd_name, rc);
897
898                 mdc_clear_replay_flag(req, rc);
899                 ptlrpc_req_finished(req);
900                 RETURN(rc);
901         }
902
903         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
904         LASSERT(lockrep != NULL);
905
906         lockrep->lock_policy_res2 =
907                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
908
909         /* Retry the create infinitely when we get -EINPROGRESS from
910          * server. This is required by the new quota design. */
911         if (it && it->it_op & IT_CREAT &&
912             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
913                 mdc_clear_replay_flag(req, rc);
914                 ptlrpc_req_finished(req);
915                 resends++;
916
917                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
918                        obddev->obd_name, resends, it->it_op,
919                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
920
921                 if (generation == obddev->u.cli.cl_import->imp_generation) {
922                         goto resend;
923                 } else {
924                         CDEBUG(D_HA, "resend cross eviction\n");
925                         RETURN(-EIO);
926                 }
927         }
928
929         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
930         if (rc < 0) {
931                 if (lustre_handle_is_used(lockh)) {
932                         ldlm_lock_decref(lockh, einfo->ei_mode);
933                         memset(lockh, 0, sizeof(*lockh));
934                 }
935                 ptlrpc_req_finished(req);
936
937                 it->d.lustre.it_lock_handle = 0;
938                 it->d.lustre.it_lock_mode = 0;
939                 it->d.lustre.it_data = NULL;
940         }
941
942         RETURN(rc);
943 }
944
945 static int mdc_finish_intent_lock(struct obd_export *exp,
946                                   struct ptlrpc_request *request,
947                                   struct md_op_data *op_data,
948                                   struct lookup_intent *it,
949                                   struct lustre_handle *lockh)
950 {
951         struct lustre_handle old_lock;
952         struct mdt_body *mdt_body;
953         struct ldlm_lock *lock;
954         int rc;
955         ENTRY;
956
957         LASSERT(request != NULL);
958         LASSERT(request != LP_POISON);
959         LASSERT(request->rq_repmsg != LP_POISON);
960
961         if (it->it_op & IT_READDIR)
962                 RETURN(0);
963
964         if (!it_disposition(it, DISP_IT_EXECD)) {
965                 /* The server failed before it even started executing the
966                  * intent, i.e. because it couldn't unpack the request. */
967                 LASSERT(it->d.lustre.it_status != 0);
968                 RETURN(it->d.lustre.it_status);
969         }
970         rc = it_open_error(DISP_IT_EXECD, it);
971         if (rc)
972                 RETURN(rc);
973
974         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
975         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
976
977         rc = it_open_error(DISP_LOOKUP_EXECD, it);
978         if (rc)
979                 RETURN(rc);
980
981         /* keep requests around for the multiple phases of the call
982          * this shows the DISP_XX must guarantee we make it into the call
983          */
984         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
985             it_disposition(it, DISP_OPEN_CREATE) &&
986             !it_open_error(DISP_OPEN_CREATE, it)) {
987                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
988                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
989         }
990         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
991             it_disposition(it, DISP_OPEN_OPEN) &&
992             !it_open_error(DISP_OPEN_OPEN, it)) {
993                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
994                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
995                 /* BUG 11546 - eviction in the middle of open rpc processing */
996                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
997         }
998
999         if (it->it_op & IT_CREAT) {
1000                 /* XXX this belongs in ll_create_it */
1001         } else if (it->it_op == IT_OPEN) {
1002                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1003         } else {
1004                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1005         }
1006
1007         /* If we already have a matching lock, then cancel the new
1008          * one.  We have to set the data here instead of in
1009          * mdc_enqueue, because we need to use the child's inode as
1010          * the l_ast_data to match, and that's not available until
1011          * intent_finish has performed the iget().) */
1012         lock = ldlm_handle2lock(lockh);
1013         if (lock) {
1014                 ldlm_policy_data_t policy = lock->l_policy_data;
1015                 LDLM_DEBUG(lock, "matching against this");
1016
1017                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
1018                                          &lock->l_resource->lr_name),
1019                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1020                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
1021                 LDLM_LOCK_PUT(lock);
1022
1023                 memcpy(&old_lock, lockh, sizeof(*lockh));
1024                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1025                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1026                         ldlm_lock_decref_and_cancel(lockh,
1027                                                     it->d.lustre.it_lock_mode);
1028                         memcpy(lockh, &old_lock, sizeof(old_lock));
1029                         it->d.lustre.it_lock_handle = lockh->cookie;
1030                 }
1031         }
1032         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1033                 (int)op_data->op_namelen, op_data->op_name,
1034                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1035                 it->d.lustre.it_disposition, rc);
1036         RETURN(rc);
1037 }
1038
1039 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1040                         struct lu_fid *fid, __u64 *bits)
1041 {
1042         /* We could just return 1 immediately, but since we should only
1043          * be called in revalidate_it if we already have a lock, let's
1044          * verify that. */
1045         struct ldlm_res_id res_id;
1046         struct lustre_handle lockh;
1047         ldlm_policy_data_t policy;
1048         ldlm_mode_t mode;
1049         ENTRY;
1050
1051         if (it->d.lustre.it_lock_handle) {
1052                 lockh.cookie = it->d.lustre.it_lock_handle;
1053                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1054         } else {
1055                 fid_build_reg_res_name(fid, &res_id);
1056                 switch (it->it_op) {
1057                 case IT_GETATTR:
1058                         /* File attributes are held under multiple bits:
1059                          * nlink is under lookup lock, size and times are
1060                          * under UPDATE lock and recently we've also got
1061                          * a separate permissions lock for owner/group/acl that
1062                          * were protected by lookup lock before.
1063                          * Getattr must provide all of that information,
1064                          * so we need to ensure we have all of those locks.
1065                          * Unfortunately, if the bits are split across multiple
1066                          * locks, there's no easy way to match all of them here,
1067                          * so an extra RPC would be performed to fetch all
1068                          * of those bits at once for now. */
1069                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1070                          * but for old MDTs (< 2.4), permission is covered
1071                          * by LOOKUP lock, so it needs to match all bits here.*/
1072                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1073                                                   MDS_INODELOCK_LOOKUP |
1074                                                   MDS_INODELOCK_PERM;
1075                         break;
1076                 case IT_READDIR:
1077                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1078                         break;
1079                 case IT_LAYOUT:
1080                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1081                         break;
1082                 default:
1083                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1084                         break;
1085                 }
1086
1087                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1088                                       LDLM_IBITS, &policy,
1089                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1090                                       &lockh);
1091         }
1092
1093         if (mode) {
1094                 it->d.lustre.it_lock_handle = lockh.cookie;
1095                 it->d.lustre.it_lock_mode = mode;
1096         } else {
1097                 it->d.lustre.it_lock_handle = 0;
1098                 it->d.lustre.it_lock_mode = 0;
1099         }
1100
1101         RETURN(!!mode);
1102 }
1103
1104 /*
1105  * This long block is all about fixing up the lock and request state
1106  * so that it is correct as of the moment _before_ the operation was
1107  * applied; that way, the VFS will think that everything is normal and
1108  * call Lustre's regular VFS methods.
1109  *
1110  * If we're performing a creation, that means that unless the creation
1111  * failed with EEXIST, we should fake up a negative dentry.
1112  *
1113  * For everything else, we want to lookup to succeed.
1114  *
1115  * One additional note: if CREATE or OPEN succeeded, we add an extra
1116  * reference to the request because we need to keep it around until
1117  * ll_create/ll_open gets called.
1118  *
1119  * The server will return to us, in it_disposition, an indication of
1120  * exactly what d.lustre.it_status refers to.
1121  *
1122  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1123  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1124  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1125  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1126  * was successful.
1127  *
1128  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1129  * child lookup.
1130  */
1131 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1132                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1133                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1134 {
1135         struct ldlm_enqueue_info einfo = {
1136                 .ei_type        = LDLM_IBITS,
1137                 .ei_mode        = it_to_lock_mode(it),
1138                 .ei_cb_bl       = cb_blocking,
1139                 .ei_cb_cp       = ldlm_completion_ast,
1140         };
1141         struct lustre_handle lockh;
1142         int rc = 0;
1143         ENTRY;
1144         LASSERT(it);
1145
1146         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1147                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1148                 op_data->op_name, PFID(&op_data->op_fid2),
1149                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1150                 it->it_flags);
1151
1152         lockh.cookie = 0;
1153         if (fid_is_sane(&op_data->op_fid2) &&
1154             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1155                 /* We could just return 1 immediately, but since we should only
1156                  * be called in revalidate_it if we already have a lock, let's
1157                  * verify that. */
1158                 it->d.lustre.it_lock_handle = 0;
1159                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1160                 /* Only return failure if it was not GETATTR by cfid
1161                    (from inode_revalidate) */
1162                 if (rc || op_data->op_namelen != 0)
1163                         RETURN(rc);
1164         }
1165
1166         /* For case if upper layer did not alloc fid, do it now. */
1167         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1168                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1169                 if (rc < 0) {
1170                         CERROR("Can't alloc new fid, rc %d\n", rc);
1171                         RETURN(rc);
1172                 }
1173         }
1174
1175         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1176                          extra_lock_flags);
1177         if (rc < 0)
1178                 RETURN(rc);
1179
1180         *reqp = it->d.lustre.it_data;
1181         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1182         RETURN(rc);
1183 }
1184
1185 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1186                                               struct ptlrpc_request *req,
1187                                               void *args, int rc)
1188 {
1189         struct mdc_getattr_args  *ga = args;
1190         struct obd_export        *exp = ga->ga_exp;
1191         struct md_enqueue_info   *minfo = ga->ga_minfo;
1192         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1193         struct lookup_intent     *it;
1194         struct lustre_handle     *lockh;
1195         struct obd_device        *obddev;
1196         struct ldlm_reply        *lockrep;
1197         __u64                     flags = LDLM_FL_HAS_INTENT;
1198         ENTRY;
1199
1200         it    = &minfo->mi_it;
1201         lockh = &minfo->mi_lockh;
1202
1203         obddev = class_exp2obd(exp);
1204
1205         obd_put_request_slot(&obddev->u.cli);
1206         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1207                 rc = -ETIMEDOUT;
1208
1209         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1210                                    &flags, NULL, 0, lockh, rc);
1211         if (rc < 0) {
1212                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1213                 mdc_clear_replay_flag(req, rc);
1214                 GOTO(out, rc);
1215         }
1216
1217         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1218         LASSERT(lockrep != NULL);
1219
1220         lockrep->lock_policy_res2 =
1221                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1222
1223         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1224         if (rc)
1225                 GOTO(out, rc);
1226
1227         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1228         EXIT;
1229
1230 out:
1231         OBD_FREE_PTR(einfo);
1232         minfo->mi_cb(req, minfo, rc);
1233         return 0;
1234 }
1235
1236 int mdc_intent_getattr_async(struct obd_export *exp,
1237                              struct md_enqueue_info *minfo,
1238                              struct ldlm_enqueue_info *einfo)
1239 {
1240         struct md_op_data       *op_data = &minfo->mi_data;
1241         struct lookup_intent    *it = &minfo->mi_it;
1242         struct ptlrpc_request   *req;
1243         struct mdc_getattr_args *ga;
1244         struct obd_device       *obddev = class_exp2obd(exp);
1245         struct ldlm_res_id       res_id;
1246         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1247          *     for statahead currently. Consider CMD in future, such two bits
1248          *     maybe managed by different MDS, should be adjusted then. */
1249         ldlm_policy_data_t       policy = {
1250                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1251                                                          MDS_INODELOCK_UPDATE }
1252                                  };
1253         int                      rc = 0;
1254         __u64                    flags = LDLM_FL_HAS_INTENT;
1255         ENTRY;
1256
1257         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1258                 LPF64"o\n",
1259                 (int)op_data->op_namelen, op_data->op_name,
1260                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1261
1262         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1263         req = mdc_intent_getattr_pack(exp, it, op_data);
1264         if (IS_ERR(req))
1265                 RETURN(PTR_ERR(req));
1266
1267         rc = obd_get_request_slot(&obddev->u.cli);
1268         if (rc != 0) {
1269                 ptlrpc_req_finished(req);
1270                 RETURN(rc);
1271         }
1272
1273         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1274                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1275         if (rc < 0) {
1276                 obd_put_request_slot(&obddev->u.cli);
1277                 ptlrpc_req_finished(req);
1278                 RETURN(rc);
1279         }
1280
1281         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1282         ga = ptlrpc_req_async_args(req);
1283         ga->ga_exp = exp;
1284         ga->ga_minfo = minfo;
1285         ga->ga_einfo = einfo;
1286
1287         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1288         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1289
1290         RETURN(0);
1291 }