Whamcloud - gitweb
LU-8726 osd-ldiskfs: bypass read for benchmarking
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
218                                 struct mdt_body *body)
219 {
220         int     rc;
221
222         /* FIXME: remove this explicit offset. */
223         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
224                                         body->mbo_eadatasize);
225         if (rc) {
226                 CERROR("Can't enlarge segment %d size to %d\n",
227                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
228                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
229                 body->mbo_eadatasize = 0;
230         }
231 }
232
233 static struct ptlrpc_request *
234 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
235                      struct md_op_data *op_data)
236 {
237         struct ptlrpc_request   *req;
238         struct obd_device       *obddev = class_exp2obd(exp);
239         struct ldlm_intent      *lit;
240         const void              *lmm = op_data->op_data;
241         __u32                    lmmsize = op_data->op_data_size;
242         struct list_head         cancels = LIST_HEAD_INIT(cancels);
243         int                      count = 0;
244         enum ldlm_mode           mode;
245         int                      rc;
246         ENTRY;
247
248         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
249
250         /* XXX: openlock is not cancelled for cross-refs. */
251         /* If inode is known, cancel conflicting OPEN locks. */
252         if (fid_is_sane(&op_data->op_fid2)) {
253                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
254                         if (it->it_flags & FMODE_WRITE)
255                                 mode = LCK_EX;
256                         else
257                                 mode = LCK_PR;
258                 } else {
259                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
260                                 mode = LCK_CW;
261 #ifdef FMODE_EXEC
262                         else if (it->it_flags & FMODE_EXEC)
263                                 mode = LCK_PR;
264 #endif
265                         else
266                                 mode = LCK_CR;
267                 }
268                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
269                                                 &cancels, mode,
270                                                 MDS_INODELOCK_OPEN);
271         }
272
273         /* If CREATE, cancel parent's UPDATE lock. */
274         if (it->it_op & IT_CREAT)
275                 mode = LCK_EX;
276         else
277                 mode = LCK_CR;
278         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
279                                          &cancels, mode,
280                                          MDS_INODELOCK_UPDATE);
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
283                                    &RQF_LDLM_INTENT_OPEN);
284         if (req == NULL) {
285                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
286                 RETURN(ERR_PTR(-ENOMEM));
287         }
288
289         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
290                              op_data->op_namelen + 1);
291         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
292                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
293
294         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
295                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
296                              strlen(op_data->op_file_secctx_name) + 1 : 0);
297
298         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
299                              op_data->op_file_secctx_size);
300
301         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
302         if (rc < 0) {
303                 ptlrpc_request_free(req);
304                 RETURN(ERR_PTR(rc));
305         }
306
307         spin_lock(&req->rq_lock);
308         req->rq_replay = req->rq_import->imp_replayable;
309         spin_unlock(&req->rq_lock);
310
311         /* pack the intent */
312         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
313         lit->opc = (__u64)it->it_op;
314
315         /* pack the intended request */
316         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
317                       lmmsize);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
320                              obddev->u.cli.cl_max_mds_easize);
321         ptlrpc_request_set_replen(req);
322         return req;
323 }
324
325 static struct ptlrpc_request *
326 mdc_intent_getxattr_pack(struct obd_export *exp,
327                          struct lookup_intent *it,
328                          struct md_op_data *op_data)
329 {
330         struct ptlrpc_request   *req;
331         struct ldlm_intent      *lit;
332         int                     rc, count = 0;
333         __u32                   maxdata;
334         struct list_head        cancels = LIST_HEAD_INIT(cancels);
335
336         ENTRY;
337
338         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
339                                         &RQF_LDLM_INTENT_GETXATTR);
340         if (req == NULL)
341                 RETURN(ERR_PTR(-ENOMEM));
342
343         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
344         if (rc) {
345                 ptlrpc_request_free(req);
346                 RETURN(ERR_PTR(rc));
347         }
348
349         /* pack the intent */
350         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351         lit->opc = IT_GETXATTR;
352
353         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
354
355         /* pack the intended request */
356         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
357                       0);
358
359         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
360                                 RCL_SERVER, maxdata);
361
362         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
363                                 RCL_SERVER, maxdata);
364
365         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
366                                 RCL_SERVER, maxdata);
367
368         ptlrpc_request_set_replen(req);
369
370         RETURN(req);
371 }
372
373 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
374                                                      struct lookup_intent *it,
375                                                      struct md_op_data *op_data)
376 {
377         struct ptlrpc_request *req;
378         struct obd_device     *obddev = class_exp2obd(exp);
379         struct ldlm_intent    *lit;
380         int                    rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
384                                    &RQF_LDLM_INTENT_UNLINK);
385         if (req == NULL)
386                 RETURN(ERR_PTR(-ENOMEM));
387
388         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
389                              op_data->op_namelen + 1);
390
391         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
392         if (rc) {
393                 ptlrpc_request_free(req);
394                 RETURN(ERR_PTR(rc));
395         }
396
397         /* pack the intent */
398         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
399         lit->opc = (__u64)it->it_op;
400
401         /* pack the intended request */
402         mdc_unlink_pack(req, op_data);
403
404         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
405                              obddev->u.cli.cl_default_mds_easize);
406         ptlrpc_request_set_replen(req);
407         RETURN(req);
408 }
409
410 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
411                                                       struct lookup_intent *it,
412                                                       struct md_op_data *op_data)
413 {
414         struct ptlrpc_request   *req;
415         struct obd_device       *obddev = class_exp2obd(exp);
416         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
417                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
418                                          OBD_MD_MEA | OBD_MD_FLACL;
419         struct ldlm_intent      *lit;
420         int                      rc;
421         __u32                    easize;
422         ENTRY;
423
424         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
425                                    &RQF_LDLM_INTENT_GETATTR);
426         if (req == NULL)
427                 RETURN(ERR_PTR(-ENOMEM));
428
429         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
430                              op_data->op_namelen + 1);
431
432         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
433         if (rc) {
434                 ptlrpc_request_free(req);
435                 RETURN(ERR_PTR(rc));
436         }
437
438         /* pack the intent */
439         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
440         lit->opc = (__u64)it->it_op;
441
442         if (obddev->u.cli.cl_default_mds_easize > 0)
443                 easize = obddev->u.cli.cl_default_mds_easize;
444         else
445                 easize = obddev->u.cli.cl_max_mds_easize;
446
447         /* pack the intended request */
448         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
449
450         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
451         ptlrpc_request_set_replen(req);
452         RETURN(req);
453 }
454
455 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
456                                                      struct lookup_intent *it,
457                                                      struct md_op_data *unused)
458 {
459         struct obd_device     *obd = class_exp2obd(exp);
460         struct ptlrpc_request *req;
461         struct ldlm_intent    *lit;
462         struct layout_intent  *layout;
463         int rc;
464         ENTRY;
465
466         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467                                 &RQF_LDLM_INTENT_LAYOUT);
468         if (req == NULL)
469                 RETURN(ERR_PTR(-ENOMEM));
470
471         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
472         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
473         if (rc) {
474                 ptlrpc_request_free(req);
475                 RETURN(ERR_PTR(rc));
476         }
477
478         /* pack the intent */
479         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
480         lit->opc = (__u64)it->it_op;
481
482         /* pack the layout intent request */
483         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
484         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
485          * set for replication */
486         layout->li_opc = LAYOUT_INTENT_ACCESS;
487
488         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
489                              obd->u.cli.cl_default_mds_easize);
490         ptlrpc_request_set_replen(req);
491         RETURN(req);
492 }
493
494 static struct ptlrpc_request *
495 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
496 {
497         struct ptlrpc_request *req;
498         int rc;
499         ENTRY;
500
501         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
502         if (req == NULL)
503                 RETURN(ERR_PTR(-ENOMEM));
504
505         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
506         if (rc) {
507                 ptlrpc_request_free(req);
508                 RETURN(ERR_PTR(rc));
509         }
510
511         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
512         ptlrpc_request_set_replen(req);
513         RETURN(req);
514 }
515
516 static int mdc_finish_enqueue(struct obd_export *exp,
517                               struct ptlrpc_request *req,
518                               struct ldlm_enqueue_info *einfo,
519                               struct lookup_intent *it,
520                               struct lustre_handle *lockh,
521                               int rc)
522 {
523         struct req_capsule  *pill = &req->rq_pill;
524         struct ldlm_request *lockreq;
525         struct ldlm_reply   *lockrep;
526         struct ldlm_lock    *lock;
527         void                *lvb_data = NULL;
528         __u32                lvb_len = 0;
529         ENTRY;
530
531         LASSERT(rc >= 0);
532         /* Similarly, if we're going to replay this request, we don't want to
533          * actually get a lock, just perform the intent. */
534         if (req->rq_transno || req->rq_replay) {
535                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
536                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
537         }
538
539         if (rc == ELDLM_LOCK_ABORTED) {
540                 einfo->ei_mode = 0;
541                 memset(lockh, 0, sizeof(*lockh));
542                 rc = 0;
543         } else { /* rc = 0 */
544                 lock = ldlm_handle2lock(lockh);
545                 LASSERT(lock != NULL);
546
547                 /* If the server gave us back a different lock mode, we should
548                  * fix up our variables. */
549                 if (lock->l_req_mode != einfo->ei_mode) {
550                         ldlm_lock_addref(lockh, lock->l_req_mode);
551                         ldlm_lock_decref(lockh, einfo->ei_mode);
552                         einfo->ei_mode = lock->l_req_mode;
553                 }
554                 LDLM_LOCK_PUT(lock);
555         }
556
557         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
558         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
559
560         it->it_disposition = (int)lockrep->lock_policy_res1;
561         it->it_status = (int)lockrep->lock_policy_res2;
562         it->it_lock_mode = einfo->ei_mode;
563         it->it_lock_handle = lockh->cookie;
564         it->it_request = req;
565
566         /* Technically speaking rq_transno must already be zero if
567          * it_status is in error, so the check is a bit redundant */
568         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
569                 mdc_clear_replay_flag(req, it->it_status);
570
571         /* If we're doing an IT_OPEN which did not result in an actual
572          * successful open, then we need to remove the bit which saves
573          * this request for unconditional replay.
574          *
575          * It's important that we do this first!  Otherwise we might exit the
576          * function without doing so, and try to replay a failed create
577          * (bug 3440) */
578         if (it->it_op & IT_OPEN && req->rq_replay &&
579             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
580                 mdc_clear_replay_flag(req, it->it_status);
581
582         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
583                   it->it_op, it->it_disposition, it->it_status);
584
585         /* We know what to expect, so we do any byte flipping required here */
586         if (it_has_reply_body(it)) {
587                 struct mdt_body *body;
588
589                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
590                 if (body == NULL) {
591                         CERROR ("Can't swab mdt_body\n");
592                         RETURN (-EPROTO);
593                 }
594
595                 if (it_disposition(it, DISP_OPEN_OPEN) &&
596                     !it_open_error(DISP_OPEN_OPEN, it)) {
597                         /*
598                          * If this is a successful OPEN request, we need to set
599                          * replay handler and data early, so that if replay
600                          * happens immediately after swabbing below, new reply
601                          * is swabbed by that handler correctly.
602                          */
603                         mdc_set_open_replay_data(NULL, NULL, it);
604                 }
605
606                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
607                         void *eadata;
608
609                         mdc_update_max_ea_from_body(exp, body);
610
611                         /*
612                          * The eadata is opaque; just check that it is there.
613                          * Eventually, obd_unpackmd() will check the contents.
614                          */
615                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
616                                                         body->mbo_eadatasize);
617                         if (eadata == NULL)
618                                 RETURN(-EPROTO);
619
620                         /* save lvb data and length in case this is for layout
621                          * lock */
622                         lvb_data = eadata;
623                         lvb_len = body->mbo_eadatasize;
624
625                         /*
626                          * We save the reply LOV EA in case we have to replay a
627                          * create for recovery.  If we didn't allocate a large
628                          * enough request buffer above we need to reallocate it
629                          * here to hold the actual LOV EA.
630                          *
631                          * To not save LOV EA if request is not going to replay
632                          * (for example error one).
633                          */
634                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
635                                 void *lmm;
636                                 if (req_capsule_get_size(pill, &RMF_EADATA,
637                                                          RCL_CLIENT) <
638                                     body->mbo_eadatasize)
639                                         mdc_realloc_openmsg(req, body);
640                                 else
641                                         req_capsule_shrink(pill, &RMF_EADATA,
642                                                            body->mbo_eadatasize,
643                                                            RCL_CLIENT);
644
645                                 req_capsule_set_size(pill, &RMF_EADATA,
646                                                      RCL_CLIENT,
647                                                      body->mbo_eadatasize);
648
649                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
650                                 if (lmm)
651                                         memcpy(lmm, eadata,
652                                                body->mbo_eadatasize);
653                         }
654                 }
655         } else if (it->it_op & IT_LAYOUT) {
656                 /* maybe the lock was granted right away and layout
657                  * is packed into RMF_DLM_LVB of req */
658                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
659                 if (lvb_len > 0) {
660                         lvb_data = req_capsule_server_sized_get(pill,
661                                                         &RMF_DLM_LVB, lvb_len);
662                         if (lvb_data == NULL)
663                                 RETURN(-EPROTO);
664                 }
665         }
666
667         /* fill in stripe data for layout lock.
668          * LU-6581: trust layout data only if layout lock is granted. The MDT
669          * has stopped sending layout unless the layout lock is granted. The
670          * client still does this checking in case it's talking with an old
671          * server. - Jinshan */
672         lock = ldlm_handle2lock(lockh);
673         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
674             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
675                 void *lmm;
676
677                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
678                         ldlm_it2str(it->it_op), lvb_len);
679
680                 OBD_ALLOC_LARGE(lmm, lvb_len);
681                 if (lmm == NULL) {
682                         LDLM_LOCK_PUT(lock);
683                         RETURN(-ENOMEM);
684                 }
685                 memcpy(lmm, lvb_data, lvb_len);
686
687                 /* install lvb_data */
688                 lock_res_and_lock(lock);
689                 if (lock->l_lvb_data == NULL) {
690                         lock->l_lvb_type = LVB_T_LAYOUT;
691                         lock->l_lvb_data = lmm;
692                         lock->l_lvb_len = lvb_len;
693                         lmm = NULL;
694                 }
695                 unlock_res_and_lock(lock);
696                 if (lmm != NULL)
697                         OBD_FREE_LARGE(lmm, lvb_len);
698         }
699         if (lock != NULL)
700                 LDLM_LOCK_PUT(lock);
701
702         RETURN(rc);
703 }
704
705 /* We always reserve enough space in the reply packet for a stripe MD, because
706  * we don't know in advance the file type. */
707 static int mdc_enqueue_base(struct obd_export *exp,
708                             struct ldlm_enqueue_info *einfo,
709                             const union ldlm_policy_data *policy,
710                             struct lookup_intent *it,
711                             struct md_op_data *op_data,
712                             struct lustre_handle *lockh,
713                             __u64 extra_lock_flags)
714 {
715         struct obd_device *obddev = class_exp2obd(exp);
716         struct ptlrpc_request *req = NULL;
717         __u64 flags, saved_flags = extra_lock_flags;
718         struct ldlm_res_id res_id;
719         static const union ldlm_policy_data lookup_policy = {
720                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
721         static const union ldlm_policy_data update_policy = {
722                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
723         static const union ldlm_policy_data layout_policy = {
724                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
725         static const union ldlm_policy_data getxattr_policy = {
726                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
727         int generation, resends = 0;
728         struct ldlm_reply *lockrep;
729         enum lvb_type lvb_type = 0;
730         int rc;
731         ENTRY;
732
733         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
734                  einfo->ei_type);
735         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
736
737         if (it != NULL) {
738                 LASSERT(policy == NULL);
739
740                 saved_flags |= LDLM_FL_HAS_INTENT;
741                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
742                         policy = &update_policy;
743                 else if (it->it_op & IT_LAYOUT)
744                         policy = &layout_policy;
745                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
746                         policy = &getxattr_policy;
747                 else
748                         policy = &lookup_policy;
749         }
750
751         generation = obddev->u.cli.cl_import->imp_generation;
752 resend:
753         flags = saved_flags;
754         if (it == NULL) {
755                 /* The only way right now is FLOCK. */
756                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
757                          einfo->ei_type);
758                 res_id.name[3] = LDLM_FLOCK;
759         } else if (it->it_op & IT_OPEN) {
760                 req = mdc_intent_open_pack(exp, it, op_data);
761         } else if (it->it_op & IT_UNLINK) {
762                 req = mdc_intent_unlink_pack(exp, it, op_data);
763         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
764                 req = mdc_intent_getattr_pack(exp, it, op_data);
765         } else if (it->it_op & IT_READDIR) {
766                 req = mdc_enqueue_pack(exp, 0);
767         } else if (it->it_op & IT_LAYOUT) {
768                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
769                         RETURN(-EOPNOTSUPP);
770                 req = mdc_intent_layout_pack(exp, it, op_data);
771                 lvb_type = LVB_T_LAYOUT;
772         } else if (it->it_op & IT_GETXATTR) {
773                 req = mdc_intent_getxattr_pack(exp, it, op_data);
774         } else {
775                 LBUG();
776                 RETURN(-EINVAL);
777         }
778
779         if (IS_ERR(req))
780                 RETURN(PTR_ERR(req));
781
782         if (resends) {
783                 req->rq_generation_set = 1;
784                 req->rq_import_generation = generation;
785                 req->rq_sent = cfs_time_current_sec() + resends;
786         }
787
788         /* It is important to obtain modify RPC slot first (if applicable), so
789          * that threads that are waiting for a modify RPC slot are not polluting
790          * our rpcs in flight counter.
791          * We do not do flock request limiting, though */
792         if (it) {
793                 mdc_get_mod_rpc_slot(req, it);
794                 rc = obd_get_request_slot(&obddev->u.cli);
795                 if (rc != 0) {
796                         mdc_put_mod_rpc_slot(req, it);
797                         mdc_clear_replay_flag(req, 0);
798                         ptlrpc_req_finished(req);
799                         RETURN(rc);
800                 }
801         }
802
803         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
804                               0, lvb_type, lockh, 0);
805         if (!it) {
806                 /* For flock requests we immediatelly return without further
807                    delay and let caller deal with the rest, since rest of
808                    this function metadata processing makes no sense for flock
809                    requests anyway. But in case of problem during comms with
810                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
811                    can not rely on caller and this mainly for F_UNLCKs
812                    (explicits or automatically generated by Kernel to clean
813                    current FLocks upon exit) that can't be trashed */
814                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
815                     (einfo->ei_type == LDLM_FLOCK) &&
816                     (einfo->ei_mode == LCK_NL))
817                         goto resend;
818                 RETURN(rc);
819         }
820
821         obd_put_request_slot(&obddev->u.cli);
822         mdc_put_mod_rpc_slot(req, it);
823
824         if (rc < 0) {
825                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
826                        obddev->obd_name, rc);
827
828                 mdc_clear_replay_flag(req, rc);
829                 ptlrpc_req_finished(req);
830                 RETURN(rc);
831         }
832
833         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
834         LASSERT(lockrep != NULL);
835
836         lockrep->lock_policy_res2 =
837                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
838
839         /* Retry infinitely when the server returns -EINPROGRESS for the
840          * intent operation, when server returns -EINPROGRESS for acquiring
841          * intent lock, we'll retry in after_reply(). */
842         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843                 mdc_clear_replay_flag(req, rc);
844                 ptlrpc_req_finished(req);
845                 resends++;
846
847                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848                        obddev->obd_name, resends, it->it_op,
849                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
850
851                 if (generation == obddev->u.cli.cl_import->imp_generation) {
852                         goto resend;
853                 } else {
854                         CDEBUG(D_HA, "resend cross eviction\n");
855                         RETURN(-EIO);
856                 }
857         }
858
859         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
860         if (rc < 0) {
861                 if (lustre_handle_is_used(lockh)) {
862                         ldlm_lock_decref(lockh, einfo->ei_mode);
863                         memset(lockh, 0, sizeof(*lockh));
864                 }
865                 ptlrpc_req_finished(req);
866
867                 it->it_lock_handle = 0;
868                 it->it_lock_mode = 0;
869                 it->it_request = NULL;
870         }
871
872         RETURN(rc);
873 }
874
875 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
876                 const union ldlm_policy_data *policy,
877                 struct md_op_data *op_data,
878                 struct lustre_handle *lockh, __u64 extra_lock_flags)
879 {
880         return mdc_enqueue_base(exp, einfo, policy, NULL,
881                                 op_data, lockh, extra_lock_flags);
882 }
883
884 static int mdc_finish_intent_lock(struct obd_export *exp,
885                                   struct ptlrpc_request *request,
886                                   struct md_op_data *op_data,
887                                   struct lookup_intent *it,
888                                   struct lustre_handle *lockh)
889 {
890         struct lustre_handle old_lock;
891         struct ldlm_lock *lock;
892         int rc = 0;
893         ENTRY;
894
895         LASSERT(request != NULL);
896         LASSERT(request != LP_POISON);
897         LASSERT(request->rq_repmsg != LP_POISON);
898
899         if (it->it_op & IT_READDIR)
900                 RETURN(0);
901
902         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
903                 if (it->it_status != 0)
904                         GOTO(out, rc = it->it_status);
905         } else {
906                 if (!it_disposition(it, DISP_IT_EXECD)) {
907                         /* The server failed before it even started executing
908                          * the intent, i.e. because it couldn't unpack the
909                          * request.
910                          */
911                         LASSERT(it->it_status != 0);
912                         GOTO(out, rc = it->it_status);
913                 }
914                 rc = it_open_error(DISP_IT_EXECD, it);
915                 if (rc)
916                         GOTO(out, rc);
917
918                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
919                 if (rc)
920                         GOTO(out, rc);
921
922                 /* keep requests around for the multiple phases of the call
923                  * this shows the DISP_XX must guarantee we make it into the
924                  * call
925                  */
926                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
927                     it_disposition(it, DISP_OPEN_CREATE) &&
928                     !it_open_error(DISP_OPEN_CREATE, it)) {
929                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
930                         /* balanced in ll_create_node */
931                         ptlrpc_request_addref(request);
932                 }
933                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
934                     it_disposition(it, DISP_OPEN_OPEN) &&
935                     !it_open_error(DISP_OPEN_OPEN, it)) {
936                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
937                         /* balanced in ll_file_open */
938                         ptlrpc_request_addref(request);
939                         /* BUG 11546 - eviction in the middle of open rpc
940                          * processing
941                          */
942                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
943                                          obd_timeout);
944                 }
945
946                 if (it->it_op & IT_CREAT) {
947                         /* XXX this belongs in ll_create_it */
948                 } else if (it->it_op == IT_OPEN) {
949                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
950                 } else {
951                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
952                 }
953         }
954
955         /* If we already have a matching lock, then cancel the new
956          * one.  We have to set the data here instead of in
957          * mdc_enqueue, because we need to use the child's inode as
958          * the l_ast_data to match, and that's not available until
959          * intent_finish has performed the iget().) */
960         lock = ldlm_handle2lock(lockh);
961         if (lock) {
962                 union ldlm_policy_data policy = lock->l_policy_data;
963                 LDLM_DEBUG(lock, "matching against this");
964
965                 if (it_has_reply_body(it)) {
966                         struct mdt_body *body;
967
968                         body = req_capsule_server_get(&request->rq_pill,
969                                                       &RMF_MDT_BODY);
970                         /* mdc_enqueue checked */
971                         LASSERT(body != NULL);
972                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
973                                                  &lock->l_resource->lr_name),
974                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
975                                  PLDLMRES(lock->l_resource),
976                                  PFID(&body->mbo_fid1));
977                 }
978                 LDLM_LOCK_PUT(lock);
979
980                 memcpy(&old_lock, lockh, sizeof(*lockh));
981                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
982                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
983                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
984                         memcpy(lockh, &old_lock, sizeof(old_lock));
985                         it->it_lock_handle = lockh->cookie;
986                 }
987         }
988
989         EXIT;
990 out:
991         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
992                 (int)op_data->op_namelen, op_data->op_name,
993                 ldlm_it2str(it->it_op), it->it_status,
994                 it->it_disposition, rc);
995         return rc;
996 }
997
998 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
999                         struct lu_fid *fid, __u64 *bits)
1000 {
1001         /* We could just return 1 immediately, but since we should only
1002          * be called in revalidate_it if we already have a lock, let's
1003          * verify that. */
1004         struct ldlm_res_id res_id;
1005         struct lustre_handle lockh;
1006         union ldlm_policy_data policy;
1007         enum ldlm_mode mode;
1008         ENTRY;
1009
1010         if (it->it_lock_handle) {
1011                 lockh.cookie = it->it_lock_handle;
1012                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1013         } else {
1014                 fid_build_reg_res_name(fid, &res_id);
1015                 switch (it->it_op) {
1016                 case IT_GETATTR:
1017                         /* File attributes are held under multiple bits:
1018                          * nlink is under lookup lock, size and times are
1019                          * under UPDATE lock and recently we've also got
1020                          * a separate permissions lock for owner/group/acl that
1021                          * were protected by lookup lock before.
1022                          * Getattr must provide all of that information,
1023                          * so we need to ensure we have all of those locks.
1024                          * Unfortunately, if the bits are split across multiple
1025                          * locks, there's no easy way to match all of them here,
1026                          * so an extra RPC would be performed to fetch all
1027                          * of those bits at once for now. */
1028                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1029                          * but for old MDTs (< 2.4), permission is covered
1030                          * by LOOKUP lock, so it needs to match all bits here.*/
1031                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1032                                                   MDS_INODELOCK_LOOKUP |
1033                                                   MDS_INODELOCK_PERM;
1034                         break;
1035                 case IT_READDIR:
1036                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1037                         break;
1038                 case IT_LAYOUT:
1039                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1040                         break;
1041                 default:
1042                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1043                         break;
1044                 }
1045
1046                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1047                                       LDLM_IBITS, &policy,
1048                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1049                                       &lockh);
1050         }
1051
1052         if (mode) {
1053                 it->it_lock_handle = lockh.cookie;
1054                 it->it_lock_mode = mode;
1055         } else {
1056                 it->it_lock_handle = 0;
1057                 it->it_lock_mode = 0;
1058         }
1059
1060         RETURN(!!mode);
1061 }
1062
1063 /*
1064  * This long block is all about fixing up the lock and request state
1065  * so that it is correct as of the moment _before_ the operation was
1066  * applied; that way, the VFS will think that everything is normal and
1067  * call Lustre's regular VFS methods.
1068  *
1069  * If we're performing a creation, that means that unless the creation
1070  * failed with EEXIST, we should fake up a negative dentry.
1071  *
1072  * For everything else, we want to lookup to succeed.
1073  *
1074  * One additional note: if CREATE or OPEN succeeded, we add an extra
1075  * reference to the request because we need to keep it around until
1076  * ll_create/ll_open gets called.
1077  *
1078  * The server will return to us, in it_disposition, an indication of
1079  * exactly what it_status refers to.
1080  *
1081  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1082  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1083  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1084  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1085  * was successful.
1086  *
1087  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1088  * child lookup.
1089  */
1090 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1091                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1092                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1093 {
1094         struct ldlm_enqueue_info einfo = {
1095                 .ei_type        = LDLM_IBITS,
1096                 .ei_mode        = it_to_lock_mode(it),
1097                 .ei_cb_bl       = cb_blocking,
1098                 .ei_cb_cp       = ldlm_completion_ast,
1099         };
1100         struct lustre_handle lockh;
1101         int rc = 0;
1102         ENTRY;
1103         LASSERT(it);
1104
1105         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1106                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1107                 op_data->op_name, PFID(&op_data->op_fid2),
1108                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1109                 it->it_flags);
1110
1111         lockh.cookie = 0;
1112         if (fid_is_sane(&op_data->op_fid2) &&
1113             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1114                 /* We could just return 1 immediately, but since we should only
1115                  * be called in revalidate_it if we already have a lock, let's
1116                  * verify that. */
1117                 it->it_lock_handle = 0;
1118                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1119                 /* Only return failure if it was not GETATTR by cfid
1120                    (from inode_revalidate) */
1121                 if (rc || op_data->op_namelen != 0)
1122                         RETURN(rc);
1123         }
1124
1125         /* For case if upper layer did not alloc fid, do it now. */
1126         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1127                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1128                 if (rc < 0) {
1129                         CERROR("Can't alloc new fid, rc %d\n", rc);
1130                         RETURN(rc);
1131                 }
1132         }
1133
1134         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1135                               extra_lock_flags);
1136         if (rc < 0)
1137                 RETURN(rc);
1138
1139         *reqp = it->it_request;
1140         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1141         RETURN(rc);
1142 }
1143
1144 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1145                                               struct ptlrpc_request *req,
1146                                               void *args, int rc)
1147 {
1148         struct mdc_getattr_args  *ga = args;
1149         struct obd_export        *exp = ga->ga_exp;
1150         struct md_enqueue_info   *minfo = ga->ga_minfo;
1151         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1152         struct lookup_intent     *it;
1153         struct lustre_handle     *lockh;
1154         struct obd_device        *obddev;
1155         struct ldlm_reply        *lockrep;
1156         __u64                     flags = LDLM_FL_HAS_INTENT;
1157         ENTRY;
1158
1159         it    = &minfo->mi_it;
1160         lockh = &minfo->mi_lockh;
1161
1162         obddev = class_exp2obd(exp);
1163
1164         obd_put_request_slot(&obddev->u.cli);
1165         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1166                 rc = -ETIMEDOUT;
1167
1168         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1169                                    &flags, NULL, 0, lockh, rc);
1170         if (rc < 0) {
1171                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1172                 mdc_clear_replay_flag(req, rc);
1173                 GOTO(out, rc);
1174         }
1175
1176         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1177         LASSERT(lockrep != NULL);
1178
1179         lockrep->lock_policy_res2 =
1180                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1181
1182         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1183         if (rc)
1184                 GOTO(out, rc);
1185
1186         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1187         EXIT;
1188
1189 out:
1190         minfo->mi_cb(req, minfo, rc);
1191         return 0;
1192 }
1193
1194 int mdc_intent_getattr_async(struct obd_export *exp,
1195                              struct md_enqueue_info *minfo)
1196 {
1197         struct md_op_data       *op_data = &minfo->mi_data;
1198         struct lookup_intent    *it = &minfo->mi_it;
1199         struct ptlrpc_request   *req;
1200         struct mdc_getattr_args *ga;
1201         struct obd_device       *obddev = class_exp2obd(exp);
1202         struct ldlm_res_id       res_id;
1203         union ldlm_policy_data policy = {
1204                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1205                                                  MDS_INODELOCK_UPDATE } };
1206         int                      rc = 0;
1207         __u64                    flags = LDLM_FL_HAS_INTENT;
1208         ENTRY;
1209
1210         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1211                 (int)op_data->op_namelen, op_data->op_name,
1212                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1213
1214         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1215         req = mdc_intent_getattr_pack(exp, it, op_data);
1216         if (IS_ERR(req))
1217                 RETURN(PTR_ERR(req));
1218
1219         rc = obd_get_request_slot(&obddev->u.cli);
1220         if (rc != 0) {
1221                 ptlrpc_req_finished(req);
1222                 RETURN(rc);
1223         }
1224
1225         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1226                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1227         if (rc < 0) {
1228                 obd_put_request_slot(&obddev->u.cli);
1229                 ptlrpc_req_finished(req);
1230                 RETURN(rc);
1231         }
1232
1233         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1234         ga = ptlrpc_req_async_args(req);
1235         ga->ga_exp = exp;
1236         ga->ga_minfo = minfo;
1237
1238         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1239         ptlrpcd_add_req(req);
1240
1241         RETURN(0);
1242 }