Whamcloud - gitweb
LU-3105 mdc: remove capa support
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40 #include <obd.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export           *ga_exp;
52         struct md_enqueue_info      *ga_minfo;
53         struct ldlm_enqueue_info    *ga_einfo;
54 };
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_LEASE)) {
59                 if (phase >= DISP_OPEN_LEASE)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
100                       __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!*lockh)
110                 RETURN(0);
111
112         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137                            const struct lu_fid *fid, ldlm_type_t type,
138                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
139                            struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         ldlm_mode_t rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp,
154                       const struct lu_fid *fid,
155                       ldlm_policy_data_t *policy,
156                       ldlm_mode_t mode,
157                       ldlm_cancel_flags_t flags,
158                       void *opaque)
159 {
160         struct ldlm_res_id res_id;
161         struct obd_device *obd = class_exp2obd(exp);
162         int rc;
163
164         ENTRY;
165
166         fid_build_reg_res_name(fid, &res_id);
167         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168                                              policy, mode, flags, opaque);
169         RETURN(rc);
170 }
171
172 int mdc_null_inode(struct obd_export *exp,
173                    const struct lu_fid *fid)
174 {
175         struct ldlm_res_id res_id;
176         struct ldlm_resource *res;
177         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
178         ENTRY;
179
180         LASSERTF(ns != NULL, "no namespace passed\n");
181
182         fid_build_reg_res_name(fid, &res_id);
183
184         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185         if (IS_ERR(res))
186                 RETURN(0);
187
188         lock_res(res);
189         res->lr_lvb_inode = NULL;
190         unlock_res(res);
191
192         ldlm_resource_putref(res);
193         RETURN(0);
194 }
195
196 /* find any ldlm lock of the inode in mdc
197  * return 0    not find
198  *        1    find one
199  *      < 0    error */
200 int mdc_find_cbdata(struct obd_export *exp,
201                     const struct lu_fid *fid,
202                     ldlm_iterator_t it, void *data)
203 {
204         struct ldlm_res_id res_id;
205         int rc = 0;
206         ENTRY;
207
208         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
210                                    it, data);
211         if (rc == LDLM_ITER_STOP)
212                 RETURN(1);
213         else if (rc == LDLM_ITER_CONTINUE)
214                 RETURN(0);
215         RETURN(rc);
216 }
217
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
219 {
220         /* Don't hold error requests for replay. */
221         if (req->rq_replay) {
222                 spin_lock(&req->rq_lock);
223                 req->rq_replay = 0;
224                 spin_unlock(&req->rq_lock);
225         }
226         if (rc && req->rq_transno != 0) {
227                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
228                 LBUG();
229         }
230 }
231
232 /* Save a large LOV EA into the request buffer so that it is available
233  * for replay.  We don't do this in the initial request because the
234  * original request doesn't need this buffer (at most it sends just the
235  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236  * buffer and may also be difficult to allocate and save a very large
237  * request buffer for each open. (bug 5707)
238  *
239  * OOM here may cause recovery failure if lmm is needed (only for the
240  * original open if the MDS crashed just when this client also OOM'd)
241  * but this is incredibly unlikely, and questionable whether the client
242  * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244                                 struct mdt_body *body)
245 {
246         int     rc;
247
248         /* FIXME: remove this explicit offset. */
249         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250                                         body->mbo_eadatasize);
251         if (rc) {
252                 CERROR("Can't enlarge segment %d size to %d\n",
253                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255                 body->mbo_eadatasize = 0;
256         }
257 }
258
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261                      struct md_op_data *op_data)
262 {
263         struct ptlrpc_request   *req;
264         struct obd_device       *obddev = class_exp2obd(exp);
265         struct ldlm_intent      *lit;
266         const void              *lmm = op_data->op_data;
267         __u32                    lmmsize = op_data->op_data_size;
268         struct list_head         cancels = LIST_HEAD_INIT(cancels);
269         int                      count = 0;
270         int                      mode;
271         int                      rc;
272         ENTRY;
273
274         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
275
276         /* XXX: openlock is not cancelled for cross-refs. */
277         /* If inode is known, cancel conflicting OPEN locks. */
278         if (fid_is_sane(&op_data->op_fid2)) {
279                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280                         if (it->it_flags & FMODE_WRITE)
281                                 mode = LCK_EX;
282                         else
283                                 mode = LCK_PR;
284                 } else {
285                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
286                                 mode = LCK_CW;
287 #ifdef FMODE_EXEC
288                         else if (it->it_flags & FMODE_EXEC)
289                                 mode = LCK_PR;
290 #endif
291                         else
292                                 mode = LCK_CR;
293                 }
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
316                              op_data->op_namelen + 1);
317         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
318                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
319
320         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
321         if (rc < 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(ERR_PTR(rc));
324         }
325
326         spin_lock(&req->rq_lock);
327         req->rq_replay = req->rq_import->imp_replayable;
328         spin_unlock(&req->rq_lock);
329
330         /* pack the intent */
331         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332         lit->opc = (__u64)it->it_op;
333
334         /* pack the intended request */
335         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
336                       lmmsize);
337
338         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339                              obddev->u.cli.cl_max_mds_easize);
340
341         /* for remote client, fetch remote perm for current user */
342         if (client_is_remote(exp))
343                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
344                                      sizeof(struct mdt_remote_perm));
345         ptlrpc_request_set_replen(req);
346         return req;
347 }
348
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351                          struct lookup_intent *it,
352                          struct md_op_data *op_data)
353 {
354         struct ptlrpc_request   *req;
355         struct ldlm_intent      *lit;
356         int                     rc, count = 0;
357         __u32                   maxdata;
358         struct list_head        cancels = LIST_HEAD_INIT(cancels);
359
360         ENTRY;
361
362         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363                                         &RQF_LDLM_INTENT_GETXATTR);
364         if (req == NULL)
365                 RETURN(ERR_PTR(-ENOMEM));
366
367         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
368         if (rc) {
369                 ptlrpc_request_free(req);
370                 RETURN(ERR_PTR(rc));
371         }
372
373         /* pack the intent */
374         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
375         lit->opc = IT_GETXATTR;
376
377         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
378
379         /* pack the intended request */
380         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
381                       0);
382
383         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
384                                 RCL_SERVER, maxdata);
385
386         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
387                                 RCL_SERVER, maxdata);
388
389         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
390                                 RCL_SERVER, maxdata);
391
392         ptlrpc_request_set_replen(req);
393
394         RETURN(req);
395 }
396
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398                                                      struct lookup_intent *it,
399                                                      struct md_op_data *op_data)
400 {
401         struct ptlrpc_request *req;
402         struct obd_device     *obddev = class_exp2obd(exp);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_UNLINK);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413                              op_data->op_namelen + 1);
414
415         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
416         if (rc) {
417                 ptlrpc_request_free(req);
418                 RETURN(ERR_PTR(rc));
419         }
420
421         /* pack the intent */
422         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423         lit->opc = (__u64)it->it_op;
424
425         /* pack the intended request */
426         mdc_unlink_pack(req, op_data);
427
428         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429                              obddev->u.cli.cl_default_mds_easize);
430         ptlrpc_request_set_replen(req);
431         RETURN(req);
432 }
433
434 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
435                                                       struct lookup_intent *it,
436                                                       struct md_op_data *op_data)
437 {
438         struct ptlrpc_request   *req;
439         struct obd_device       *obddev = class_exp2obd(exp);
440         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442                                          OBD_MD_MEA |
443                                          (client_is_remote(exp) ?
444                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
445         struct ldlm_intent      *lit;
446         int                      rc;
447         __u32                    easize;
448         ENTRY;
449
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
451                                    &RQF_LDLM_INTENT_GETATTR);
452         if (req == NULL)
453                 RETURN(ERR_PTR(-ENOMEM));
454
455         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
456                              op_data->op_namelen + 1);
457
458         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
459         if (rc) {
460                 ptlrpc_request_free(req);
461                 RETURN(ERR_PTR(rc));
462         }
463
464         /* pack the intent */
465         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
466         lit->opc = (__u64)it->it_op;
467
468         if (obddev->u.cli.cl_default_mds_easize > 0)
469                 easize = obddev->u.cli.cl_default_mds_easize;
470         else
471                 easize = obddev->u.cli.cl_max_mds_easize;
472
473         /* pack the intended request */
474         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
475
476         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
477         if (client_is_remote(exp))
478                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
479                                      sizeof(struct mdt_remote_perm));
480         ptlrpc_request_set_replen(req);
481         RETURN(req);
482 }
483
484 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
485                                                      struct lookup_intent *it,
486                                                      struct md_op_data *unused)
487 {
488         struct obd_device     *obd = class_exp2obd(exp);
489         struct ptlrpc_request *req;
490         struct ldlm_intent    *lit;
491         struct layout_intent  *layout;
492         int rc;
493         ENTRY;
494
495         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
496                                 &RQF_LDLM_INTENT_LAYOUT);
497         if (req == NULL)
498                 RETURN(ERR_PTR(-ENOMEM));
499
500         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
501         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
502         if (rc) {
503                 ptlrpc_request_free(req);
504                 RETURN(ERR_PTR(rc));
505         }
506
507         /* pack the intent */
508         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
509         lit->opc = (__u64)it->it_op;
510
511         /* pack the layout intent request */
512         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
513         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
514          * set for replication */
515         layout->li_opc = LAYOUT_INTENT_ACCESS;
516
517         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
518                              obd->u.cli.cl_default_mds_easize);
519         ptlrpc_request_set_replen(req);
520         RETURN(req);
521 }
522
523 static struct ptlrpc_request *
524 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
525 {
526         struct ptlrpc_request *req;
527         int rc;
528         ENTRY;
529
530         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
531         if (req == NULL)
532                 RETURN(ERR_PTR(-ENOMEM));
533
534         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(ERR_PTR(rc));
538         }
539
540         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
541         ptlrpc_request_set_replen(req);
542         RETURN(req);
543 }
544
545 static int mdc_finish_enqueue(struct obd_export *exp,
546                               struct ptlrpc_request *req,
547                               struct ldlm_enqueue_info *einfo,
548                               struct lookup_intent *it,
549                               struct lustre_handle *lockh,
550                               int rc)
551 {
552         struct req_capsule  *pill = &req->rq_pill;
553         struct ldlm_request *lockreq;
554         struct ldlm_reply   *lockrep;
555         struct lustre_intent_data *intent = &it->d.lustre;
556         struct ldlm_lock    *lock;
557         void                *lvb_data = NULL;
558         __u32                lvb_len = 0;
559         ENTRY;
560
561         LASSERT(rc >= 0);
562         /* Similarly, if we're going to replay this request, we don't want to
563          * actually get a lock, just perform the intent. */
564         if (req->rq_transno || req->rq_replay) {
565                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
566                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
567         }
568
569         if (rc == ELDLM_LOCK_ABORTED) {
570                 einfo->ei_mode = 0;
571                 memset(lockh, 0, sizeof(*lockh));
572                 rc = 0;
573         } else { /* rc = 0 */
574                 lock = ldlm_handle2lock(lockh);
575                 LASSERT(lock != NULL);
576
577                 /* If the server gave us back a different lock mode, we should
578                  * fix up our variables. */
579                 if (lock->l_req_mode != einfo->ei_mode) {
580                         ldlm_lock_addref(lockh, lock->l_req_mode);
581                         ldlm_lock_decref(lockh, einfo->ei_mode);
582                         einfo->ei_mode = lock->l_req_mode;
583                 }
584                 LDLM_LOCK_PUT(lock);
585         }
586
587         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
588         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
589
590         intent->it_disposition = (int)lockrep->lock_policy_res1;
591         intent->it_status = (int)lockrep->lock_policy_res2;
592         intent->it_lock_mode = einfo->ei_mode;
593         intent->it_lock_handle = lockh->cookie;
594         intent->it_data = req;
595
596         /* Technically speaking rq_transno must already be zero if
597          * it_status is in error, so the check is a bit redundant */
598         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
599                 mdc_clear_replay_flag(req, intent->it_status);
600
601         /* If we're doing an IT_OPEN which did not result in an actual
602          * successful open, then we need to remove the bit which saves
603          * this request for unconditional replay.
604          *
605          * It's important that we do this first!  Otherwise we might exit the
606          * function without doing so, and try to replay a failed create
607          * (bug 3440) */
608         if (it->it_op & IT_OPEN && req->rq_replay &&
609             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
610                 mdc_clear_replay_flag(req, intent->it_status);
611
612         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
613                   it->it_op, intent->it_disposition, intent->it_status);
614
615         /* We know what to expect, so we do any byte flipping required here */
616         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
617                 struct mdt_body *body;
618
619                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
620                 if (body == NULL) {
621                         CERROR ("Can't swab mdt_body\n");
622                         RETURN (-EPROTO);
623                 }
624
625                 if (it_disposition(it, DISP_OPEN_OPEN) &&
626                     !it_open_error(DISP_OPEN_OPEN, it)) {
627                         /*
628                          * If this is a successful OPEN request, we need to set
629                          * replay handler and data early, so that if replay
630                          * happens immediately after swabbing below, new reply
631                          * is swabbed by that handler correctly.
632                          */
633                         mdc_set_open_replay_data(NULL, NULL, it);
634                 }
635
636                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
637                         void *eadata;
638
639                         mdc_update_max_ea_from_body(exp, body);
640
641                         /*
642                          * The eadata is opaque; just check that it is there.
643                          * Eventually, obd_unpackmd() will check the contents.
644                          */
645                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
646                                                         body->mbo_eadatasize);
647                         if (eadata == NULL)
648                                 RETURN(-EPROTO);
649
650                         /* save lvb data and length in case this is for layout
651                          * lock */
652                         lvb_data = eadata;
653                         lvb_len = body->mbo_eadatasize;
654
655                         /*
656                          * We save the reply LOV EA in case we have to replay a
657                          * create for recovery.  If we didn't allocate a large
658                          * enough request buffer above we need to reallocate it
659                          * here to hold the actual LOV EA.
660                          *
661                          * To not save LOV EA if request is not going to replay
662                          * (for example error one).
663                          */
664                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
665                                 void *lmm;
666                                 if (req_capsule_get_size(pill, &RMF_EADATA,
667                                                          RCL_CLIENT) <
668                                     body->mbo_eadatasize)
669                                         mdc_realloc_openmsg(req, body);
670                                 else
671                                         req_capsule_shrink(pill, &RMF_EADATA,
672                                                            body->mbo_eadatasize,
673                                                            RCL_CLIENT);
674
675                                 req_capsule_set_size(pill, &RMF_EADATA,
676                                                      RCL_CLIENT,
677                                                      body->mbo_eadatasize);
678
679                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
680                                 if (lmm)
681                                         memcpy(lmm, eadata,
682                                                body->mbo_eadatasize);
683                         }
684                 }
685
686                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
687                         struct mdt_remote_perm *perm;
688
689                         LASSERT(client_is_remote(exp));
690                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
691                                                 lustre_swab_mdt_remote_perm);
692                         if (perm == NULL)
693                                 RETURN(-EPROTO);
694                 }
695         } else if (it->it_op & IT_LAYOUT) {
696                 /* maybe the lock was granted right away and layout
697                  * is packed into RMF_DLM_LVB of req */
698                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
699                 if (lvb_len > 0) {
700                         lvb_data = req_capsule_server_sized_get(pill,
701                                                         &RMF_DLM_LVB, lvb_len);
702                         if (lvb_data == NULL)
703                                 RETURN(-EPROTO);
704                 }
705         }
706
707         /* fill in stripe data for layout lock.
708          * LU-6581: trust layout data only if layout lock is granted. The MDT
709          * has stopped sending layout unless the layout lock is granted. The
710          * client still does this checking in case it's talking with an old
711          * server. - Jinshan */
712         lock = ldlm_handle2lock(lockh);
713         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
714             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
715                 void *lmm;
716
717                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
718                         ldlm_it2str(it->it_op), lvb_len);
719
720                 OBD_ALLOC_LARGE(lmm, lvb_len);
721                 if (lmm == NULL) {
722                         LDLM_LOCK_PUT(lock);
723                         RETURN(-ENOMEM);
724                 }
725                 memcpy(lmm, lvb_data, lvb_len);
726
727                 /* install lvb_data */
728                 lock_res_and_lock(lock);
729                 if (lock->l_lvb_data == NULL) {
730                         lock->l_lvb_type = LVB_T_LAYOUT;
731                         lock->l_lvb_data = lmm;
732                         lock->l_lvb_len = lvb_len;
733                         lmm = NULL;
734                 }
735                 unlock_res_and_lock(lock);
736                 if (lmm != NULL)
737                         OBD_FREE_LARGE(lmm, lvb_len);
738         }
739         if (lock != NULL)
740                 LDLM_LOCK_PUT(lock);
741
742         RETURN(rc);
743 }
744
745 /* We always reserve enough space in the reply packet for a stripe MD, because
746  * we don't know in advance the file type. */
747 int mdc_enqueue(struct obd_export *exp,
748                 struct ldlm_enqueue_info *einfo,
749                 const union ldlm_policy_data *policy,
750                 struct lookup_intent *it, struct md_op_data *op_data,
751                 struct lustre_handle *lockh, __u64 extra_lock_flags)
752 {
753         struct obd_device     *obddev = class_exp2obd(exp);
754         struct ptlrpc_request *req = NULL;
755         __u64                  flags, saved_flags = extra_lock_flags;
756         int                    rc;
757         struct ldlm_res_id res_id;
758         static const ldlm_policy_data_t lookup_policy =
759                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
760         static const ldlm_policy_data_t update_policy =
761                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
762         static const ldlm_policy_data_t layout_policy =
763                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
764         static const ldlm_policy_data_t getxattr_policy = {
765                               .l_inodebits = { MDS_INODELOCK_XATTR } };
766         int                    generation, resends = 0;
767         struct ldlm_reply     *lockrep;
768         enum lvb_type          lvb_type = 0;
769         ENTRY;
770
771         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
772                  einfo->ei_type);
773         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774
775         if (it != NULL) {
776                 LASSERT(policy == NULL);
777
778                 saved_flags |= LDLM_FL_HAS_INTENT;
779                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
780                         policy = &update_policy;
781                 else if (it->it_op & IT_LAYOUT)
782                         policy = &layout_policy;
783                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
784                         policy = &getxattr_policy;
785                 else
786                         policy = &lookup_policy;
787         }
788
789         generation = obddev->u.cli.cl_import->imp_generation;
790 resend:
791         flags = saved_flags;
792         if (it == NULL) {
793                 /* The only way right now is FLOCK. */
794                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
795                          einfo->ei_type);
796                 res_id.name[3] = LDLM_FLOCK;
797         } else if (it->it_op & IT_OPEN) {
798                 req = mdc_intent_open_pack(exp, it, op_data);
799         } else if (it->it_op & IT_UNLINK) {
800                 req = mdc_intent_unlink_pack(exp, it, op_data);
801         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
802                 req = mdc_intent_getattr_pack(exp, it, op_data);
803         } else if (it->it_op & IT_READDIR) {
804                 req = mdc_enqueue_pack(exp, 0);
805         } else if (it->it_op & IT_LAYOUT) {
806                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
807                         RETURN(-EOPNOTSUPP);
808                 req = mdc_intent_layout_pack(exp, it, op_data);
809                 lvb_type = LVB_T_LAYOUT;
810         } else if (it->it_op & IT_GETXATTR) {
811                 req = mdc_intent_getxattr_pack(exp, it, op_data);
812         } else {
813                 LBUG();
814                 RETURN(-EINVAL);
815         }
816
817         if (IS_ERR(req))
818                 RETURN(PTR_ERR(req));
819
820         if (req != NULL && it && it->it_op & IT_CREAT)
821                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
822                  * retry logic */
823                 req->rq_no_retry_einprogress = 1;
824
825         if (resends) {
826                 req->rq_generation_set = 1;
827                 req->rq_import_generation = generation;
828                 req->rq_sent = cfs_time_current_sec() + resends;
829         }
830
831         /* It is important to obtain rpc_lock first (if applicable), so that
832          * threads that are serialised with rpc_lock are not polluting our
833          * rpcs in flight counter. We do not do flock request limiting, though*/
834         if (it) {
835                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
836                 rc = obd_get_request_slot(&obddev->u.cli);
837                 if (rc != 0) {
838                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
839                         mdc_clear_replay_flag(req, 0);
840                         ptlrpc_req_finished(req);
841                         RETURN(rc);
842                 }
843         }
844
845         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
846                               0, lvb_type, lockh, 0);
847         if (!it) {
848                 /* For flock requests we immediatelly return without further
849                    delay and let caller deal with the rest, since rest of
850                    this function metadata processing makes no sense for flock
851                    requests anyway. But in case of problem during comms with
852                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
853                    can not rely on caller and this mainly for F_UNLCKs
854                    (explicits or automatically generated by Kernel to clean
855                    current FLocks upon exit) that can't be trashed */
856                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
857                     (einfo->ei_type == LDLM_FLOCK) &&
858                     (einfo->ei_mode == LCK_NL))
859                         goto resend;
860                 RETURN(rc);
861         }
862
863         obd_put_request_slot(&obddev->u.cli);
864         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
865
866         if (rc < 0) {
867                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
868                        obddev->obd_name, rc);
869
870                 mdc_clear_replay_flag(req, rc);
871                 ptlrpc_req_finished(req);
872                 RETURN(rc);
873         }
874
875         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
876         LASSERT(lockrep != NULL);
877
878         lockrep->lock_policy_res2 =
879                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
880
881         /* Retry the create infinitely when we get -EINPROGRESS from
882          * server. This is required by the new quota design. */
883         if (it && it->it_op & IT_CREAT &&
884             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
885                 mdc_clear_replay_flag(req, rc);
886                 ptlrpc_req_finished(req);
887                 resends++;
888
889                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
890                        obddev->obd_name, resends, it->it_op,
891                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
892
893                 if (generation == obddev->u.cli.cl_import->imp_generation) {
894                         goto resend;
895                 } else {
896                         CDEBUG(D_HA, "resend cross eviction\n");
897                         RETURN(-EIO);
898                 }
899         }
900
901         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
902         if (rc < 0) {
903                 if (lustre_handle_is_used(lockh)) {
904                         ldlm_lock_decref(lockh, einfo->ei_mode);
905                         memset(lockh, 0, sizeof(*lockh));
906                 }
907                 ptlrpc_req_finished(req);
908
909                 it->d.lustre.it_lock_handle = 0;
910                 it->d.lustre.it_lock_mode = 0;
911                 it->d.lustre.it_data = NULL;
912         }
913
914         RETURN(rc);
915 }
916
917 static int mdc_finish_intent_lock(struct obd_export *exp,
918                                   struct ptlrpc_request *request,
919                                   struct md_op_data *op_data,
920                                   struct lookup_intent *it,
921                                   struct lustre_handle *lockh)
922 {
923         struct lustre_handle old_lock;
924         struct mdt_body *mdt_body;
925         struct ldlm_lock *lock;
926         int rc;
927         ENTRY;
928
929         LASSERT(request != NULL);
930         LASSERT(request != LP_POISON);
931         LASSERT(request->rq_repmsg != LP_POISON);
932
933         if (it->it_op & IT_READDIR)
934                 RETURN(0);
935
936         if (!it_disposition(it, DISP_IT_EXECD)) {
937                 /* The server failed before it even started executing the
938                  * intent, i.e. because it couldn't unpack the request. */
939                 LASSERT(it->d.lustre.it_status != 0);
940                 RETURN(it->d.lustre.it_status);
941         }
942         rc = it_open_error(DISP_IT_EXECD, it);
943         if (rc)
944                 RETURN(rc);
945
946         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
947         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
948
949         rc = it_open_error(DISP_LOOKUP_EXECD, it);
950         if (rc)
951                 RETURN(rc);
952
953         /* keep requests around for the multiple phases of the call
954          * this shows the DISP_XX must guarantee we make it into the call
955          */
956         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
957             it_disposition(it, DISP_OPEN_CREATE) &&
958             !it_open_error(DISP_OPEN_CREATE, it)) {
959                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
960                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
961         }
962         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
963             it_disposition(it, DISP_OPEN_OPEN) &&
964             !it_open_error(DISP_OPEN_OPEN, it)) {
965                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
966                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
967                 /* BUG 11546 - eviction in the middle of open rpc processing */
968                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
969         }
970
971         if (it->it_op & IT_CREAT) {
972                 /* XXX this belongs in ll_create_it */
973         } else if (it->it_op == IT_OPEN) {
974                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
975         } else {
976                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
977         }
978
979         /* If we already have a matching lock, then cancel the new
980          * one.  We have to set the data here instead of in
981          * mdc_enqueue, because we need to use the child's inode as
982          * the l_ast_data to match, and that's not available until
983          * intent_finish has performed the iget().) */
984         lock = ldlm_handle2lock(lockh);
985         if (lock) {
986                 ldlm_policy_data_t policy = lock->l_policy_data;
987                 LDLM_DEBUG(lock, "matching against this");
988
989                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
990                                          &lock->l_resource->lr_name),
991                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
992                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
993                 LDLM_LOCK_PUT(lock);
994
995                 memcpy(&old_lock, lockh, sizeof(*lockh));
996                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
997                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
998                         ldlm_lock_decref_and_cancel(lockh,
999                                                     it->d.lustre.it_lock_mode);
1000                         memcpy(lockh, &old_lock, sizeof(old_lock));
1001                         it->d.lustre.it_lock_handle = lockh->cookie;
1002                 }
1003         }
1004         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1005                 (int)op_data->op_namelen, op_data->op_name,
1006                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1007                 it->d.lustre.it_disposition, rc);
1008         RETURN(rc);
1009 }
1010
1011 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1012                         struct lu_fid *fid, __u64 *bits)
1013 {
1014         /* We could just return 1 immediately, but since we should only
1015          * be called in revalidate_it if we already have a lock, let's
1016          * verify that. */
1017         struct ldlm_res_id res_id;
1018         struct lustre_handle lockh;
1019         ldlm_policy_data_t policy;
1020         ldlm_mode_t mode;
1021         ENTRY;
1022
1023         if (it->d.lustre.it_lock_handle) {
1024                 lockh.cookie = it->d.lustre.it_lock_handle;
1025                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1026         } else {
1027                 fid_build_reg_res_name(fid, &res_id);
1028                 switch (it->it_op) {
1029                 case IT_GETATTR:
1030                         /* File attributes are held under multiple bits:
1031                          * nlink is under lookup lock, size and times are
1032                          * under UPDATE lock and recently we've also got
1033                          * a separate permissions lock for owner/group/acl that
1034                          * were protected by lookup lock before.
1035                          * Getattr must provide all of that information,
1036                          * so we need to ensure we have all of those locks.
1037                          * Unfortunately, if the bits are split across multiple
1038                          * locks, there's no easy way to match all of them here,
1039                          * so an extra RPC would be performed to fetch all
1040                          * of those bits at once for now. */
1041                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1042                          * but for old MDTs (< 2.4), permission is covered
1043                          * by LOOKUP lock, so it needs to match all bits here.*/
1044                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1045                                                   MDS_INODELOCK_LOOKUP |
1046                                                   MDS_INODELOCK_PERM;
1047                         break;
1048                 case IT_READDIR:
1049                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1050                         break;
1051                 case IT_LAYOUT:
1052                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1053                         break;
1054                 default:
1055                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1056                         break;
1057                 }
1058
1059                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1060                                       LDLM_IBITS, &policy,
1061                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1062                                       &lockh);
1063         }
1064
1065         if (mode) {
1066                 it->d.lustre.it_lock_handle = lockh.cookie;
1067                 it->d.lustre.it_lock_mode = mode;
1068         } else {
1069                 it->d.lustre.it_lock_handle = 0;
1070                 it->d.lustre.it_lock_mode = 0;
1071         }
1072
1073         RETURN(!!mode);
1074 }
1075
1076 /*
1077  * This long block is all about fixing up the lock and request state
1078  * so that it is correct as of the moment _before_ the operation was
1079  * applied; that way, the VFS will think that everything is normal and
1080  * call Lustre's regular VFS methods.
1081  *
1082  * If we're performing a creation, that means that unless the creation
1083  * failed with EEXIST, we should fake up a negative dentry.
1084  *
1085  * For everything else, we want to lookup to succeed.
1086  *
1087  * One additional note: if CREATE or OPEN succeeded, we add an extra
1088  * reference to the request because we need to keep it around until
1089  * ll_create/ll_open gets called.
1090  *
1091  * The server will return to us, in it_disposition, an indication of
1092  * exactly what d.lustre.it_status refers to.
1093  *
1094  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1095  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1096  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1097  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1098  * was successful.
1099  *
1100  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1101  * child lookup.
1102  */
1103 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1104                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1105                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1106 {
1107         struct ldlm_enqueue_info einfo = {
1108                 .ei_type        = LDLM_IBITS,
1109                 .ei_mode        = it_to_lock_mode(it),
1110                 .ei_cb_bl       = cb_blocking,
1111                 .ei_cb_cp       = ldlm_completion_ast,
1112         };
1113         struct lustre_handle lockh;
1114         int rc = 0;
1115         ENTRY;
1116         LASSERT(it);
1117
1118         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1119                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1120                 op_data->op_name, PFID(&op_data->op_fid2),
1121                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1122                 it->it_flags);
1123
1124         lockh.cookie = 0;
1125         if (fid_is_sane(&op_data->op_fid2) &&
1126             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1127                 /* We could just return 1 immediately, but since we should only
1128                  * be called in revalidate_it if we already have a lock, let's
1129                  * verify that. */
1130                 it->d.lustre.it_lock_handle = 0;
1131                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1132                 /* Only return failure if it was not GETATTR by cfid
1133                    (from inode_revalidate) */
1134                 if (rc || op_data->op_namelen != 0)
1135                         RETURN(rc);
1136         }
1137
1138         /* For case if upper layer did not alloc fid, do it now. */
1139         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1140                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1141                 if (rc < 0) {
1142                         CERROR("Can't alloc new fid, rc %d\n", rc);
1143                         RETURN(rc);
1144                 }
1145         }
1146
1147         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1148                          extra_lock_flags);
1149         if (rc < 0)
1150                 RETURN(rc);
1151
1152         *reqp = it->d.lustre.it_data;
1153         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1154         RETURN(rc);
1155 }
1156
1157 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1158                                               struct ptlrpc_request *req,
1159                                               void *args, int rc)
1160 {
1161         struct mdc_getattr_args  *ga = args;
1162         struct obd_export        *exp = ga->ga_exp;
1163         struct md_enqueue_info   *minfo = ga->ga_minfo;
1164         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1165         struct lookup_intent     *it;
1166         struct lustre_handle     *lockh;
1167         struct obd_device        *obddev;
1168         struct ldlm_reply        *lockrep;
1169         __u64                     flags = LDLM_FL_HAS_INTENT;
1170         ENTRY;
1171
1172         it    = &minfo->mi_it;
1173         lockh = &minfo->mi_lockh;
1174
1175         obddev = class_exp2obd(exp);
1176
1177         obd_put_request_slot(&obddev->u.cli);
1178         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1179                 rc = -ETIMEDOUT;
1180
1181         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1182                                    &flags, NULL, 0, lockh, rc);
1183         if (rc < 0) {
1184                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1185                 mdc_clear_replay_flag(req, rc);
1186                 GOTO(out, rc);
1187         }
1188
1189         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1190         LASSERT(lockrep != NULL);
1191
1192         lockrep->lock_policy_res2 =
1193                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1194
1195         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1196         if (rc)
1197                 GOTO(out, rc);
1198
1199         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1200         EXIT;
1201
1202 out:
1203         OBD_FREE_PTR(einfo);
1204         minfo->mi_cb(req, minfo, rc);
1205         return 0;
1206 }
1207
1208 int mdc_intent_getattr_async(struct obd_export *exp,
1209                              struct md_enqueue_info *minfo,
1210                              struct ldlm_enqueue_info *einfo)
1211 {
1212         struct md_op_data       *op_data = &minfo->mi_data;
1213         struct lookup_intent    *it = &minfo->mi_it;
1214         struct ptlrpc_request   *req;
1215         struct mdc_getattr_args *ga;
1216         struct obd_device       *obddev = class_exp2obd(exp);
1217         struct ldlm_res_id       res_id;
1218         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1219          *     for statahead currently. Consider CMD in future, such two bits
1220          *     maybe managed by different MDS, should be adjusted then. */
1221         ldlm_policy_data_t       policy = {
1222                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1223                                                          MDS_INODELOCK_UPDATE }
1224                                  };
1225         int                      rc = 0;
1226         __u64                    flags = LDLM_FL_HAS_INTENT;
1227         ENTRY;
1228
1229         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1230                 LPF64"o\n",
1231                 (int)op_data->op_namelen, op_data->op_name,
1232                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1233
1234         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1235         req = mdc_intent_getattr_pack(exp, it, op_data);
1236         if (IS_ERR(req))
1237                 RETURN(PTR_ERR(req));
1238
1239         rc = obd_get_request_slot(&obddev->u.cli);
1240         if (rc != 0) {
1241                 ptlrpc_req_finished(req);
1242                 RETURN(rc);
1243         }
1244
1245         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1246                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1247         if (rc < 0) {
1248                 obd_put_request_slot(&obddev->u.cli);
1249                 ptlrpc_req_finished(req);
1250                 RETURN(rc);
1251         }
1252
1253         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1254         ga = ptlrpc_req_async_args(req);
1255         ga->ga_exp = exp;
1256         ga->ga_minfo = minfo;
1257         ga->ga_einfo = einfo;
1258
1259         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1260         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1261
1262         RETURN(0);
1263 }