Whamcloud - gitweb
LU-7433 ldlm: xattr locks are lost on mdt
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
50
51 #include "mdc_internal.h"
52
53 struct mdc_getattr_args {
54         struct obd_export               *ga_exp;
55         struct md_enqueue_info          *ga_minfo;
56 };
57
58 int it_open_error(int phase, struct lookup_intent *it)
59 {
60         if (it_disposition(it, DISP_OPEN_LEASE)) {
61                 if (phase >= DISP_OPEN_LEASE)
62                         return it->it_status;
63                 else
64                         return 0;
65         }
66         if (it_disposition(it, DISP_OPEN_OPEN)) {
67                 if (phase >= DISP_OPEN_OPEN)
68                         return it->it_status;
69                 else
70                         return 0;
71         }
72
73         if (it_disposition(it, DISP_OPEN_CREATE)) {
74                 if (phase >= DISP_OPEN_CREATE)
75                         return it->it_status;
76                 else
77                         return 0;
78         }
79
80         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81                 if (phase >= DISP_LOOKUP_EXECD)
82                         return it->it_status;
83                 else
84                         return 0;
85         }
86
87         if (it_disposition(it, DISP_IT_EXECD)) {
88                 if (phase >= DISP_IT_EXECD)
89                         return it->it_status;
90                 else
91                         return 0;
92         }
93
94         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95         LBUG();
96
97         return 0;
98 }
99 EXPORT_SYMBOL(it_open_error);
100
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
103                       void *data, __u64 *bits)
104 {
105         struct ldlm_lock *lock;
106         struct inode *new_inode = data;
107         ENTRY;
108
109         if(bits)
110                 *bits = 0;
111
112         if (!lustre_handle_is_used(lockh))
113                 RETURN(0);
114
115         lock = ldlm_handle2lock(lockh);
116
117         LASSERT(lock != NULL);
118         lock_res_and_lock(lock);
119         if (lock->l_resource->lr_lvb_inode &&
120             lock->l_resource->lr_lvb_inode != data) {
121                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
122                 LASSERTF(old_inode->i_state & I_FREEING,
123                          "Found existing inode %p/%lu/%u state %lu in lock: "
124                          "setting data to %p/%lu/%u\n", old_inode,
125                          old_inode->i_ino, old_inode->i_generation,
126                          old_inode->i_state,
127                          new_inode, new_inode->i_ino, new_inode->i_generation);
128         }
129         lock->l_resource->lr_lvb_inode = new_inode;
130         if (bits)
131                 *bits = lock->l_policy_data.l_inodebits.bits;
132
133         unlock_res_and_lock(lock);
134         LDLM_LOCK_PUT(lock);
135
136         RETURN(0);
137 }
138
139 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
140                               const struct lu_fid *fid, enum ldlm_type type,
141                               union ldlm_policy_data *policy,
142                               enum ldlm_mode mode, struct lustre_handle *lockh)
143 {
144         struct ldlm_res_id res_id;
145         enum ldlm_mode rc;
146         ENTRY;
147
148         fid_build_reg_res_name(fid, &res_id);
149         /* LU-4405: Clear bits not supported by server */
150         policy->l_inodebits.bits &= exp_connect_ibits(exp);
151         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
152                              &res_id, type, policy, mode, lockh, 0);
153         RETURN(rc);
154 }
155
156 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
157                       union ldlm_policy_data *policy, enum ldlm_mode mode,
158                       enum ldlm_cancel_flags flags, void *opaque)
159 {
160         struct obd_device *obd = class_exp2obd(exp);
161         struct ldlm_res_id res_id;
162         int rc;
163
164         ENTRY;
165
166         fid_build_reg_res_name(fid, &res_id);
167         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168                                              policy, mode, flags, opaque);
169         RETURN(rc);
170 }
171
172 int mdc_null_inode(struct obd_export *exp,
173                    const struct lu_fid *fid)
174 {
175         struct ldlm_res_id res_id;
176         struct ldlm_resource *res;
177         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
178         ENTRY;
179
180         LASSERTF(ns != NULL, "no namespace passed\n");
181
182         fid_build_reg_res_name(fid, &res_id);
183
184         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185         if (IS_ERR(res))
186                 RETURN(0);
187
188         lock_res(res);
189         res->lr_lvb_inode = NULL;
190         unlock_res(res);
191
192         ldlm_resource_putref(res);
193         RETURN(0);
194 }
195
196 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
197 {
198         /* Don't hold error requests for replay. */
199         if (req->rq_replay) {
200                 spin_lock(&req->rq_lock);
201                 req->rq_replay = 0;
202                 spin_unlock(&req->rq_lock);
203         }
204         if (rc && req->rq_transno != 0) {
205                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206                 LBUG();
207         }
208 }
209
210 /* Save a large LOV EA into the request buffer so that it is available
211  * for replay.  We don't do this in the initial request because the
212  * original request doesn't need this buffer (at most it sends just the
213  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
214  * buffer and may also be difficult to allocate and save a very large
215  * request buffer for each open. (bug 5707)
216  *
217  * OOM here may cause recovery failure if lmm is needed (only for the
218  * original open if the MDS crashed just when this client also OOM'd)
219  * but this is incredibly unlikely, and questionable whether the client
220  * could do MDS recovery under OOM anyways... */
221 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
222                                 struct mdt_body *body)
223 {
224         int     rc;
225
226         /* FIXME: remove this explicit offset. */
227         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228                                         body->mbo_eadatasize);
229         if (rc) {
230                 CERROR("Can't enlarge segment %d size to %d\n",
231                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
232                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
233                 body->mbo_eadatasize = 0;
234         }
235 }
236
237 static struct ptlrpc_request *
238 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
239                      struct md_op_data *op_data)
240 {
241         struct ptlrpc_request   *req;
242         struct obd_device       *obddev = class_exp2obd(exp);
243         struct ldlm_intent      *lit;
244         const void              *lmm = op_data->op_data;
245         __u32                    lmmsize = op_data->op_data_size;
246         struct list_head         cancels = LIST_HEAD_INIT(cancels);
247         int                      count = 0;
248         enum ldlm_mode           mode;
249         int                      rc;
250         ENTRY;
251
252         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
253
254         /* XXX: openlock is not cancelled for cross-refs. */
255         /* If inode is known, cancel conflicting OPEN locks. */
256         if (fid_is_sane(&op_data->op_fid2)) {
257                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
258                         if (it->it_flags & FMODE_WRITE)
259                                 mode = LCK_EX;
260                         else
261                                 mode = LCK_PR;
262                 } else {
263                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
264                                 mode = LCK_CW;
265 #ifdef FMODE_EXEC
266                         else if (it->it_flags & FMODE_EXEC)
267                                 mode = LCK_PR;
268 #endif
269                         else
270                                 mode = LCK_CR;
271                 }
272                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
273                                                 &cancels, mode,
274                                                 MDS_INODELOCK_OPEN);
275         }
276
277         /* If CREATE, cancel parent's UPDATE lock. */
278         if (it->it_op & IT_CREAT)
279                 mode = LCK_EX;
280         else
281                 mode = LCK_CR;
282         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
283                                          &cancels, mode,
284                                          MDS_INODELOCK_UPDATE);
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287                                    &RQF_LDLM_INTENT_OPEN);
288         if (req == NULL) {
289                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292
293         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
294                              op_data->op_namelen + 1);
295         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
296                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297
298         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
299                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
300                              strlen(op_data->op_file_secctx_name) + 1 : 0);
301
302         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
303                              op_data->op_file_secctx_size);
304
305         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
306         if (rc < 0) {
307                 ptlrpc_request_free(req);
308                 RETURN(ERR_PTR(rc));
309         }
310
311         spin_lock(&req->rq_lock);
312         req->rq_replay = req->rq_import->imp_replayable;
313         spin_unlock(&req->rq_lock);
314
315         /* pack the intent */
316         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
317         lit->opc = (__u64)it->it_op;
318
319         /* pack the intended request */
320         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
321                       lmmsize);
322
323         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
324                              obddev->u.cli.cl_max_mds_easize);
325         ptlrpc_request_set_replen(req);
326         return req;
327 }
328
329 static struct ptlrpc_request *
330 mdc_intent_getxattr_pack(struct obd_export *exp,
331                          struct lookup_intent *it,
332                          struct md_op_data *op_data)
333 {
334         struct ptlrpc_request   *req;
335         struct ldlm_intent      *lit;
336         int                     rc, count = 0;
337         __u32                   maxdata;
338         struct list_head        cancels = LIST_HEAD_INIT(cancels);
339
340         ENTRY;
341
342         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343                                         &RQF_LDLM_INTENT_GETXATTR);
344         if (req == NULL)
345                 RETURN(ERR_PTR(-ENOMEM));
346
347         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 RETURN(ERR_PTR(rc));
351         }
352
353         /* pack the intent */
354         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
355         lit->opc = IT_GETXATTR;
356
357         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
358
359         /* pack the intended request */
360         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
361                       0);
362
363         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
364                                 RCL_SERVER, maxdata);
365
366         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
367                                 RCL_SERVER, maxdata);
368
369         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
370                                 RCL_SERVER, maxdata);
371
372         ptlrpc_request_set_replen(req);
373
374         RETURN(req);
375 }
376
377 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
378                                                      struct lookup_intent *it,
379                                                      struct md_op_data *op_data)
380 {
381         struct ptlrpc_request *req;
382         struct obd_device     *obddev = class_exp2obd(exp);
383         struct ldlm_intent    *lit;
384         int                    rc;
385         ENTRY;
386
387         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388                                    &RQF_LDLM_INTENT_UNLINK);
389         if (req == NULL)
390                 RETURN(ERR_PTR(-ENOMEM));
391
392         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
393                              op_data->op_namelen + 1);
394
395         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = (__u64)it->it_op;
404
405         /* pack the intended request */
406         mdc_unlink_pack(req, op_data);
407
408         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
409                              obddev->u.cli.cl_default_mds_easize);
410         ptlrpc_request_set_replen(req);
411         RETURN(req);
412 }
413
414 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
415                                                       struct lookup_intent *it,
416                                                       struct md_op_data *op_data)
417 {
418         struct ptlrpc_request   *req;
419         struct obd_device       *obddev = class_exp2obd(exp);
420         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
421                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
422                                          OBD_MD_MEA | OBD_MD_FLACL;
423         struct ldlm_intent      *lit;
424         int                      rc;
425         __u32                    easize;
426         ENTRY;
427
428         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
429                                    &RQF_LDLM_INTENT_GETATTR);
430         if (req == NULL)
431                 RETURN(ERR_PTR(-ENOMEM));
432
433         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
434                              op_data->op_namelen + 1);
435
436         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 RETURN(ERR_PTR(rc));
440         }
441
442         /* pack the intent */
443         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
444         lit->opc = (__u64)it->it_op;
445
446         if (obddev->u.cli.cl_default_mds_easize > 0)
447                 easize = obddev->u.cli.cl_default_mds_easize;
448         else
449                 easize = obddev->u.cli.cl_max_mds_easize;
450
451         /* pack the intended request */
452         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
453
454         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
455         ptlrpc_request_set_replen(req);
456         RETURN(req);
457 }
458
459 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
460                                                      struct lookup_intent *it,
461                                                      struct md_op_data *unused)
462 {
463         struct obd_device     *obd = class_exp2obd(exp);
464         struct ptlrpc_request *req;
465         struct ldlm_intent    *lit;
466         struct layout_intent  *layout;
467         int rc;
468         ENTRY;
469
470         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
471                                 &RQF_LDLM_INTENT_LAYOUT);
472         if (req == NULL)
473                 RETURN(ERR_PTR(-ENOMEM));
474
475         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
476         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 RETURN(ERR_PTR(rc));
480         }
481
482         /* pack the intent */
483         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484         lit->opc = (__u64)it->it_op;
485
486         /* pack the layout intent request */
487         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
488         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
489          * set for replication */
490         layout->li_opc = LAYOUT_INTENT_ACCESS;
491
492         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
493                              obd->u.cli.cl_default_mds_easize);
494         ptlrpc_request_set_replen(req);
495         RETURN(req);
496 }
497
498 static struct ptlrpc_request *
499 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
500 {
501         struct ptlrpc_request *req;
502         int rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
506         if (req == NULL)
507                 RETURN(ERR_PTR(-ENOMEM));
508
509         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(ERR_PTR(rc));
513         }
514
515         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
516         ptlrpc_request_set_replen(req);
517         RETURN(req);
518 }
519
520 static int mdc_finish_enqueue(struct obd_export *exp,
521                               struct ptlrpc_request *req,
522                               struct ldlm_enqueue_info *einfo,
523                               struct lookup_intent *it,
524                               struct lustre_handle *lockh,
525                               int rc)
526 {
527         struct req_capsule  *pill = &req->rq_pill;
528         struct ldlm_request *lockreq;
529         struct ldlm_reply   *lockrep;
530         struct ldlm_lock    *lock;
531         void                *lvb_data = NULL;
532         __u32                lvb_len = 0;
533         ENTRY;
534
535         LASSERT(rc >= 0);
536         /* Similarly, if we're going to replay this request, we don't want to
537          * actually get a lock, just perform the intent. */
538         if (req->rq_transno || req->rq_replay) {
539                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
540                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
541         }
542
543         if (rc == ELDLM_LOCK_ABORTED) {
544                 einfo->ei_mode = 0;
545                 memset(lockh, 0, sizeof(*lockh));
546                 rc = 0;
547         } else { /* rc = 0 */
548                 lock = ldlm_handle2lock(lockh);
549                 LASSERT(lock != NULL);
550
551                 /* If the server gave us back a different lock mode, we should
552                  * fix up our variables. */
553                 if (lock->l_req_mode != einfo->ei_mode) {
554                         ldlm_lock_addref(lockh, lock->l_req_mode);
555                         ldlm_lock_decref(lockh, einfo->ei_mode);
556                         einfo->ei_mode = lock->l_req_mode;
557                 }
558                 LDLM_LOCK_PUT(lock);
559         }
560
561         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
562         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
563
564         it->it_disposition = (int)lockrep->lock_policy_res1;
565         it->it_status = (int)lockrep->lock_policy_res2;
566         it->it_lock_mode = einfo->ei_mode;
567         it->it_lock_handle = lockh->cookie;
568         it->it_request = req;
569
570         /* Technically speaking rq_transno must already be zero if
571          * it_status is in error, so the check is a bit redundant */
572         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
573                 mdc_clear_replay_flag(req, it->it_status);
574
575         /* If we're doing an IT_OPEN which did not result in an actual
576          * successful open, then we need to remove the bit which saves
577          * this request for unconditional replay.
578          *
579          * It's important that we do this first!  Otherwise we might exit the
580          * function without doing so, and try to replay a failed create
581          * (bug 3440) */
582         if (it->it_op & IT_OPEN && req->rq_replay &&
583             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
584                 mdc_clear_replay_flag(req, it->it_status);
585
586         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
587                   it->it_op, it->it_disposition, it->it_status);
588
589         /* We know what to expect, so we do any byte flipping required here */
590         if (it_has_reply_body(it)) {
591                 struct mdt_body *body;
592
593                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
594                 if (body == NULL) {
595                         CERROR ("Can't swab mdt_body\n");
596                         RETURN (-EPROTO);
597                 }
598
599                 if (it_disposition(it, DISP_OPEN_OPEN) &&
600                     !it_open_error(DISP_OPEN_OPEN, it)) {
601                         /*
602                          * If this is a successful OPEN request, we need to set
603                          * replay handler and data early, so that if replay
604                          * happens immediately after swabbing below, new reply
605                          * is swabbed by that handler correctly.
606                          */
607                         mdc_set_open_replay_data(NULL, NULL, it);
608                 }
609
610                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
611                         void *eadata;
612
613                         mdc_update_max_ea_from_body(exp, body);
614
615                         /*
616                          * The eadata is opaque; just check that it is there.
617                          * Eventually, obd_unpackmd() will check the contents.
618                          */
619                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
620                                                         body->mbo_eadatasize);
621                         if (eadata == NULL)
622                                 RETURN(-EPROTO);
623
624                         /* save lvb data and length in case this is for layout
625                          * lock */
626                         lvb_data = eadata;
627                         lvb_len = body->mbo_eadatasize;
628
629                         /*
630                          * We save the reply LOV EA in case we have to replay a
631                          * create for recovery.  If we didn't allocate a large
632                          * enough request buffer above we need to reallocate it
633                          * here to hold the actual LOV EA.
634                          *
635                          * To not save LOV EA if request is not going to replay
636                          * (for example error one).
637                          */
638                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
639                                 void *lmm;
640                                 if (req_capsule_get_size(pill, &RMF_EADATA,
641                                                          RCL_CLIENT) <
642                                     body->mbo_eadatasize)
643                                         mdc_realloc_openmsg(req, body);
644                                 else
645                                         req_capsule_shrink(pill, &RMF_EADATA,
646                                                            body->mbo_eadatasize,
647                                                            RCL_CLIENT);
648
649                                 req_capsule_set_size(pill, &RMF_EADATA,
650                                                      RCL_CLIENT,
651                                                      body->mbo_eadatasize);
652
653                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
654                                 if (lmm)
655                                         memcpy(lmm, eadata,
656                                                body->mbo_eadatasize);
657                         }
658                 }
659         } else if (it->it_op & IT_LAYOUT) {
660                 /* maybe the lock was granted right away and layout
661                  * is packed into RMF_DLM_LVB of req */
662                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
663                 if (lvb_len > 0) {
664                         lvb_data = req_capsule_server_sized_get(pill,
665                                                         &RMF_DLM_LVB, lvb_len);
666                         if (lvb_data == NULL)
667                                 RETURN(-EPROTO);
668                 }
669         }
670
671         /* fill in stripe data for layout lock.
672          * LU-6581: trust layout data only if layout lock is granted. The MDT
673          * has stopped sending layout unless the layout lock is granted. The
674          * client still does this checking in case it's talking with an old
675          * server. - Jinshan */
676         lock = ldlm_handle2lock(lockh);
677         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
678             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
679                 void *lmm;
680
681                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
682                         ldlm_it2str(it->it_op), lvb_len);
683
684                 OBD_ALLOC_LARGE(lmm, lvb_len);
685                 if (lmm == NULL) {
686                         LDLM_LOCK_PUT(lock);
687                         RETURN(-ENOMEM);
688                 }
689                 memcpy(lmm, lvb_data, lvb_len);
690
691                 /* install lvb_data */
692                 lock_res_and_lock(lock);
693                 if (lock->l_lvb_data == NULL) {
694                         lock->l_lvb_type = LVB_T_LAYOUT;
695                         lock->l_lvb_data = lmm;
696                         lock->l_lvb_len = lvb_len;
697                         lmm = NULL;
698                 }
699                 unlock_res_and_lock(lock);
700                 if (lmm != NULL)
701                         OBD_FREE_LARGE(lmm, lvb_len);
702         }
703         if (lock != NULL)
704                 LDLM_LOCK_PUT(lock);
705
706         RETURN(rc);
707 }
708
709 /* We always reserve enough space in the reply packet for a stripe MD, because
710  * we don't know in advance the file type. */
711 static int mdc_enqueue_base(struct obd_export *exp,
712                             struct ldlm_enqueue_info *einfo,
713                             const union ldlm_policy_data *policy,
714                             struct lookup_intent *it,
715                             struct md_op_data *op_data,
716                             struct lustre_handle *lockh,
717                             __u64 extra_lock_flags)
718 {
719         struct obd_device *obddev = class_exp2obd(exp);
720         struct ptlrpc_request *req = NULL;
721         __u64 flags, saved_flags = extra_lock_flags;
722         struct ldlm_res_id res_id;
723         static const union ldlm_policy_data lookup_policy = {
724                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
725         static const union ldlm_policy_data update_policy = {
726                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
727         static const union ldlm_policy_data layout_policy = {
728                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
729         static const union ldlm_policy_data getxattr_policy = {
730                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
731         int generation, resends = 0;
732         struct ldlm_reply *lockrep;
733         enum lvb_type lvb_type = 0;
734         int rc;
735         ENTRY;
736
737         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
738                  einfo->ei_type);
739         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
740
741         if (it != NULL) {
742                 LASSERT(policy == NULL);
743
744                 saved_flags |= LDLM_FL_HAS_INTENT;
745                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
746                         policy = &update_policy;
747                 else if (it->it_op & IT_LAYOUT)
748                         policy = &layout_policy;
749                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
750                         policy = &getxattr_policy;
751                 else
752                         policy = &lookup_policy;
753         }
754
755         generation = obddev->u.cli.cl_import->imp_generation;
756 resend:
757         flags = saved_flags;
758         if (it == NULL) {
759                 /* The only way right now is FLOCK. */
760                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
761                          einfo->ei_type);
762                 res_id.name[3] = LDLM_FLOCK;
763         } else if (it->it_op & IT_OPEN) {
764                 req = mdc_intent_open_pack(exp, it, op_data);
765         } else if (it->it_op & IT_UNLINK) {
766                 req = mdc_intent_unlink_pack(exp, it, op_data);
767         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
768                 req = mdc_intent_getattr_pack(exp, it, op_data);
769         } else if (it->it_op & IT_READDIR) {
770                 req = mdc_enqueue_pack(exp, 0);
771         } else if (it->it_op & IT_LAYOUT) {
772                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
773                         RETURN(-EOPNOTSUPP);
774                 req = mdc_intent_layout_pack(exp, it, op_data);
775                 lvb_type = LVB_T_LAYOUT;
776         } else if (it->it_op & IT_GETXATTR) {
777                 req = mdc_intent_getxattr_pack(exp, it, op_data);
778         } else {
779                 LBUG();
780                 RETURN(-EINVAL);
781         }
782
783         if (IS_ERR(req))
784                 RETURN(PTR_ERR(req));
785
786         if (resends) {
787                 req->rq_generation_set = 1;
788                 req->rq_import_generation = generation;
789                 req->rq_sent = cfs_time_current_sec() + resends;
790         }
791
792         /* It is important to obtain modify RPC slot first (if applicable), so
793          * that threads that are waiting for a modify RPC slot are not polluting
794          * our rpcs in flight counter.
795          * We do not do flock request limiting, though */
796         if (it) {
797                 mdc_get_mod_rpc_slot(req, it);
798                 rc = obd_get_request_slot(&obddev->u.cli);
799                 if (rc != 0) {
800                         mdc_put_mod_rpc_slot(req, it);
801                         mdc_clear_replay_flag(req, 0);
802                         ptlrpc_req_finished(req);
803                         RETURN(rc);
804                 }
805         }
806
807         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
808                               0, lvb_type, lockh, 0);
809         if (!it) {
810                 /* For flock requests we immediatelly return without further
811                    delay and let caller deal with the rest, since rest of
812                    this function metadata processing makes no sense for flock
813                    requests anyway. But in case of problem during comms with
814                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
815                    can not rely on caller and this mainly for F_UNLCKs
816                    (explicits or automatically generated by Kernel to clean
817                    current FLocks upon exit) that can't be trashed */
818                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
819                     (einfo->ei_type == LDLM_FLOCK) &&
820                     (einfo->ei_mode == LCK_NL))
821                         goto resend;
822                 RETURN(rc);
823         }
824
825         obd_put_request_slot(&obddev->u.cli);
826         mdc_put_mod_rpc_slot(req, it);
827
828         if (rc < 0) {
829                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
830                        obddev->obd_name, rc);
831
832                 mdc_clear_replay_flag(req, rc);
833                 ptlrpc_req_finished(req);
834                 RETURN(rc);
835         }
836
837         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
838         LASSERT(lockrep != NULL);
839
840         lockrep->lock_policy_res2 =
841                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
842
843         /* Retry infinitely when the server returns -EINPROGRESS for the
844          * intent operation, when server returns -EINPROGRESS for acquiring
845          * intent lock, we'll retry in after_reply(). */
846         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
847                 mdc_clear_replay_flag(req, rc);
848                 ptlrpc_req_finished(req);
849                 resends++;
850
851                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
852                        obddev->obd_name, resends, it->it_op,
853                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
854
855                 if (generation == obddev->u.cli.cl_import->imp_generation) {
856                         goto resend;
857                 } else {
858                         CDEBUG(D_HA, "resend cross eviction\n");
859                         RETURN(-EIO);
860                 }
861         }
862
863         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
864         if (rc < 0) {
865                 if (lustre_handle_is_used(lockh)) {
866                         ldlm_lock_decref(lockh, einfo->ei_mode);
867                         memset(lockh, 0, sizeof(*lockh));
868                 }
869                 ptlrpc_req_finished(req);
870
871                 it->it_lock_handle = 0;
872                 it->it_lock_mode = 0;
873                 it->it_request = NULL;
874         }
875
876         RETURN(rc);
877 }
878
879 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
880                 const union ldlm_policy_data *policy,
881                 struct md_op_data *op_data,
882                 struct lustre_handle *lockh, __u64 extra_lock_flags)
883 {
884         return mdc_enqueue_base(exp, einfo, policy, NULL,
885                                 op_data, lockh, extra_lock_flags);
886 }
887
888 static int mdc_finish_intent_lock(struct obd_export *exp,
889                                   struct ptlrpc_request *request,
890                                   struct md_op_data *op_data,
891                                   struct lookup_intent *it,
892                                   struct lustre_handle *lockh)
893 {
894         struct lustre_handle old_lock;
895         struct ldlm_lock *lock;
896         int rc = 0;
897         ENTRY;
898
899         LASSERT(request != NULL);
900         LASSERT(request != LP_POISON);
901         LASSERT(request->rq_repmsg != LP_POISON);
902
903         if (it->it_op & IT_READDIR)
904                 RETURN(0);
905
906         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
907                 if (it->it_status != 0)
908                         GOTO(out, rc = it->it_status);
909         } else {
910                 if (!it_disposition(it, DISP_IT_EXECD)) {
911                         /* The server failed before it even started executing
912                          * the intent, i.e. because it couldn't unpack the
913                          * request.
914                          */
915                         LASSERT(it->it_status != 0);
916                         GOTO(out, rc = it->it_status);
917                 }
918                 rc = it_open_error(DISP_IT_EXECD, it);
919                 if (rc)
920                         GOTO(out, rc);
921
922                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
923                 if (rc)
924                         GOTO(out, rc);
925
926                 /* keep requests around for the multiple phases of the call
927                  * this shows the DISP_XX must guarantee we make it into the
928                  * call
929                  */
930                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
931                     it_disposition(it, DISP_OPEN_CREATE) &&
932                     !it_open_error(DISP_OPEN_CREATE, it)) {
933                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
934                         /* balanced in ll_create_node */
935                         ptlrpc_request_addref(request);
936                 }
937                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
938                     it_disposition(it, DISP_OPEN_OPEN) &&
939                     !it_open_error(DISP_OPEN_OPEN, it)) {
940                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
941                         /* balanced in ll_file_open */
942                         ptlrpc_request_addref(request);
943                         /* BUG 11546 - eviction in the middle of open rpc
944                          * processing
945                          */
946                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
947                                          obd_timeout);
948                 }
949
950                 if (it->it_op & IT_CREAT) {
951                         /* XXX this belongs in ll_create_it */
952                 } else if (it->it_op == IT_OPEN) {
953                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
954                 } else {
955                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
956                 }
957         }
958
959         /* If we already have a matching lock, then cancel the new
960          * one.  We have to set the data here instead of in
961          * mdc_enqueue, because we need to use the child's inode as
962          * the l_ast_data to match, and that's not available until
963          * intent_finish has performed the iget().) */
964         lock = ldlm_handle2lock(lockh);
965         if (lock) {
966                 union ldlm_policy_data policy = lock->l_policy_data;
967                 LDLM_DEBUG(lock, "matching against this");
968
969                 if (it_has_reply_body(it)) {
970                         struct mdt_body *body;
971
972                         body = req_capsule_server_get(&request->rq_pill,
973                                                       &RMF_MDT_BODY);
974                         /* mdc_enqueue checked */
975                         LASSERT(body != NULL);
976                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
977                                                  &lock->l_resource->lr_name),
978                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
979                                  PLDLMRES(lock->l_resource),
980                                  PFID(&body->mbo_fid1));
981                 }
982                 LDLM_LOCK_PUT(lock);
983
984                 memcpy(&old_lock, lockh, sizeof(*lockh));
985                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
986                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
987                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
988                         memcpy(lockh, &old_lock, sizeof(old_lock));
989                         it->it_lock_handle = lockh->cookie;
990                 }
991         }
992
993         EXIT;
994 out:
995         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
996                 (int)op_data->op_namelen, op_data->op_name,
997                 ldlm_it2str(it->it_op), it->it_status,
998                 it->it_disposition, rc);
999         return rc;
1000 }
1001
1002 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1003                         struct lu_fid *fid, __u64 *bits)
1004 {
1005         /* We could just return 1 immediately, but since we should only
1006          * be called in revalidate_it if we already have a lock, let's
1007          * verify that. */
1008         struct ldlm_res_id res_id;
1009         struct lustre_handle lockh;
1010         union ldlm_policy_data policy;
1011         enum ldlm_mode mode;
1012         ENTRY;
1013
1014         if (it->it_lock_handle) {
1015                 lockh.cookie = it->it_lock_handle;
1016                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1017         } else {
1018                 fid_build_reg_res_name(fid, &res_id);
1019                 switch (it->it_op) {
1020                 case IT_GETATTR:
1021                         /* File attributes are held under multiple bits:
1022                          * nlink is under lookup lock, size and times are
1023                          * under UPDATE lock and recently we've also got
1024                          * a separate permissions lock for owner/group/acl that
1025                          * were protected by lookup lock before.
1026                          * Getattr must provide all of that information,
1027                          * so we need to ensure we have all of those locks.
1028                          * Unfortunately, if the bits are split across multiple
1029                          * locks, there's no easy way to match all of them here,
1030                          * so an extra RPC would be performed to fetch all
1031                          * of those bits at once for now. */
1032                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1033                          * but for old MDTs (< 2.4), permission is covered
1034                          * by LOOKUP lock, so it needs to match all bits here.*/
1035                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1036                                                   MDS_INODELOCK_LOOKUP |
1037                                                   MDS_INODELOCK_PERM;
1038                         break;
1039                 case IT_READDIR:
1040                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1041                         break;
1042                 case IT_LAYOUT:
1043                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1044                         break;
1045                 default:
1046                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1047                         break;
1048                 }
1049
1050                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1051                                       LDLM_IBITS, &policy,
1052                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1053                                       &lockh);
1054         }
1055
1056         if (mode) {
1057                 it->it_lock_handle = lockh.cookie;
1058                 it->it_lock_mode = mode;
1059         } else {
1060                 it->it_lock_handle = 0;
1061                 it->it_lock_mode = 0;
1062         }
1063
1064         RETURN(!!mode);
1065 }
1066
1067 /*
1068  * This long block is all about fixing up the lock and request state
1069  * so that it is correct as of the moment _before_ the operation was
1070  * applied; that way, the VFS will think that everything is normal and
1071  * call Lustre's regular VFS methods.
1072  *
1073  * If we're performing a creation, that means that unless the creation
1074  * failed with EEXIST, we should fake up a negative dentry.
1075  *
1076  * For everything else, we want to lookup to succeed.
1077  *
1078  * One additional note: if CREATE or OPEN succeeded, we add an extra
1079  * reference to the request because we need to keep it around until
1080  * ll_create/ll_open gets called.
1081  *
1082  * The server will return to us, in it_disposition, an indication of
1083  * exactly what it_status refers to.
1084  *
1085  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1086  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1087  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1088  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1089  * was successful.
1090  *
1091  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1092  * child lookup.
1093  */
1094 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1095                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1096                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1097 {
1098         struct ldlm_enqueue_info einfo = {
1099                 .ei_type        = LDLM_IBITS,
1100                 .ei_mode        = it_to_lock_mode(it),
1101                 .ei_cb_bl       = cb_blocking,
1102                 .ei_cb_cp       = ldlm_completion_ast,
1103         };
1104         struct lustre_handle lockh;
1105         int rc = 0;
1106         ENTRY;
1107         LASSERT(it);
1108
1109         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1110                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1111                 op_data->op_name, PFID(&op_data->op_fid2),
1112                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1113                 it->it_flags);
1114
1115         lockh.cookie = 0;
1116         if (fid_is_sane(&op_data->op_fid2) &&
1117             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1118                 /* We could just return 1 immediately, but since we should only
1119                  * be called in revalidate_it if we already have a lock, let's
1120                  * verify that. */
1121                 it->it_lock_handle = 0;
1122                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1123                 /* Only return failure if it was not GETATTR by cfid
1124                    (from inode_revalidate) */
1125                 if (rc || op_data->op_namelen != 0)
1126                         RETURN(rc);
1127         }
1128
1129         /* For case if upper layer did not alloc fid, do it now. */
1130         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1131                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1132                 if (rc < 0) {
1133                         CERROR("Can't alloc new fid, rc %d\n", rc);
1134                         RETURN(rc);
1135                 }
1136         }
1137
1138         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1139                               extra_lock_flags);
1140         if (rc < 0)
1141                 RETURN(rc);
1142
1143         *reqp = it->it_request;
1144         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1145         RETURN(rc);
1146 }
1147
1148 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1149                                               struct ptlrpc_request *req,
1150                                               void *args, int rc)
1151 {
1152         struct mdc_getattr_args  *ga = args;
1153         struct obd_export        *exp = ga->ga_exp;
1154         struct md_enqueue_info   *minfo = ga->ga_minfo;
1155         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1156         struct lookup_intent     *it;
1157         struct lustre_handle     *lockh;
1158         struct obd_device        *obddev;
1159         struct ldlm_reply        *lockrep;
1160         __u64                     flags = LDLM_FL_HAS_INTENT;
1161         ENTRY;
1162
1163         it    = &minfo->mi_it;
1164         lockh = &minfo->mi_lockh;
1165
1166         obddev = class_exp2obd(exp);
1167
1168         obd_put_request_slot(&obddev->u.cli);
1169         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1170                 rc = -ETIMEDOUT;
1171
1172         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1173                                    &flags, NULL, 0, lockh, rc);
1174         if (rc < 0) {
1175                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1176                 mdc_clear_replay_flag(req, rc);
1177                 GOTO(out, rc);
1178         }
1179
1180         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1181         LASSERT(lockrep != NULL);
1182
1183         lockrep->lock_policy_res2 =
1184                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1185
1186         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1187         if (rc)
1188                 GOTO(out, rc);
1189
1190         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1191         EXIT;
1192
1193 out:
1194         minfo->mi_cb(req, minfo, rc);
1195         return 0;
1196 }
1197
1198 int mdc_intent_getattr_async(struct obd_export *exp,
1199                              struct md_enqueue_info *minfo)
1200 {
1201         struct md_op_data       *op_data = &minfo->mi_data;
1202         struct lookup_intent    *it = &minfo->mi_it;
1203         struct ptlrpc_request   *req;
1204         struct mdc_getattr_args *ga;
1205         struct obd_device       *obddev = class_exp2obd(exp);
1206         struct ldlm_res_id       res_id;
1207         union ldlm_policy_data policy = {
1208                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1209                                                  MDS_INODELOCK_UPDATE } };
1210         int                      rc = 0;
1211         __u64                    flags = LDLM_FL_HAS_INTENT;
1212         ENTRY;
1213
1214         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1215                 (int)op_data->op_namelen, op_data->op_name,
1216                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1217
1218         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1219         req = mdc_intent_getattr_pack(exp, it, op_data);
1220         if (IS_ERR(req))
1221                 RETURN(PTR_ERR(req));
1222
1223         rc = obd_get_request_slot(&obddev->u.cli);
1224         if (rc != 0) {
1225                 ptlrpc_req_finished(req);
1226                 RETURN(rc);
1227         }
1228
1229         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1230                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1231         if (rc < 0) {
1232                 obd_put_request_slot(&obddev->u.cli);
1233                 ptlrpc_req_finished(req);
1234                 RETURN(rc);
1235         }
1236
1237         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1238         ga = ptlrpc_req_async_args(req);
1239         ga->ga_exp = exp;
1240         ga->ga_minfo = minfo;
1241
1242         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1243         ptlrpcd_add_req(req);
1244
1245         RETURN(0);
1246 }