Whamcloud - gitweb
LU-10368 mdc: resend quotactl if needed
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN  250
341 #define GA_DEFAULT_EA_NUM      10
342
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345                          struct lookup_intent *it,
346                          struct md_op_data *op_data)
347 {
348         struct ptlrpc_request   *req;
349         struct ldlm_intent      *lit;
350         int                     rc, count = 0;
351         struct list_head        cancels = LIST_HEAD_INIT(cancels);
352
353         ENTRY;
354
355         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356                                         &RQF_LDLM_INTENT_GETXATTR);
357         if (req == NULL)
358                 RETURN(ERR_PTR(-ENOMEM));
359
360         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
361         if (rc) {
362                 ptlrpc_request_free(req);
363                 RETURN(ERR_PTR(rc));
364         }
365
366         /* pack the intent */
367         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368         lit->opc = IT_GETXATTR;
369
370         /* pack the intended request */
371         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
373                       -1, 0);
374
375         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
377
378         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
385
386         ptlrpc_request_set_replen(req);
387
388         RETURN(req);
389 }
390
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392                                                      struct lookup_intent *it,
393                                                      struct md_op_data *op_data)
394 {
395         struct ptlrpc_request *req;
396         struct obd_device     *obddev = class_exp2obd(exp);
397         struct ldlm_intent    *lit;
398         int                    rc;
399         ENTRY;
400
401         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402                                    &RQF_LDLM_INTENT_UNLINK);
403         if (req == NULL)
404                 RETURN(ERR_PTR(-ENOMEM));
405
406         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407                              op_data->op_namelen + 1);
408
409         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
410         if (rc) {
411                 ptlrpc_request_free(req);
412                 RETURN(ERR_PTR(rc));
413         }
414
415         /* pack the intent */
416         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417         lit->opc = (__u64)it->it_op;
418
419         /* pack the intended request */
420         mdc_unlink_pack(req, op_data);
421
422         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423                              obddev->u.cli.cl_default_mds_easize);
424         ptlrpc_request_set_replen(req);
425         RETURN(req);
426 }
427
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429                                                       struct lookup_intent *it,
430                                                       struct md_op_data *op_data)
431 {
432         struct ptlrpc_request   *req;
433         struct obd_device       *obddev = class_exp2obd(exp);
434         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436                                          OBD_MD_MEA | OBD_MD_FLACL;
437         struct ldlm_intent      *lit;
438         int                      rc;
439         __u32                    easize;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443                                    &RQF_LDLM_INTENT_GETATTR);
444         if (req == NULL)
445                 RETURN(ERR_PTR(-ENOMEM));
446
447         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448                              op_data->op_namelen + 1);
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         /* pack the intent */
457         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458         lit->opc = (__u64)it->it_op;
459
460         if (obddev->u.cli.cl_default_mds_easize > 0)
461                 easize = obddev->u.cli.cl_default_mds_easize;
462         else
463                 easize = obddev->u.cli.cl_max_mds_easize;
464
465         /* pack the intended request */
466         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
467
468         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470                              req->rq_import->imp_connect_data.ocd_max_easize);
471         ptlrpc_request_set_replen(req);
472         RETURN(req);
473 }
474
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476                                                      struct lookup_intent *it,
477                                                      struct md_op_data *op_data)
478 {
479         struct obd_device     *obd = class_exp2obd(exp);
480         struct ptlrpc_request *req;
481         struct ldlm_intent    *lit;
482         struct layout_intent  *layout;
483         int rc;
484         ENTRY;
485
486         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487                                 &RQF_LDLM_INTENT_LAYOUT);
488         if (req == NULL)
489                 RETURN(ERR_PTR(-ENOMEM));
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
493         if (rc) {
494                 ptlrpc_request_free(req);
495                 RETURN(ERR_PTR(rc));
496         }
497
498         /* pack the intent */
499         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500         lit->opc = (__u64)it->it_op;
501
502         /* pack the layout intent request */
503         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504         LASSERT(op_data->op_data != NULL);
505         LASSERT(op_data->op_data_size == sizeof(*layout));
506         memcpy(layout, op_data->op_data, sizeof(*layout));
507
508         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509                              obd->u.cli.cl_default_mds_easize);
510         ptlrpc_request_set_replen(req);
511         RETURN(req);
512 }
513
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
516 {
517         struct ptlrpc_request *req;
518         int rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
522         if (req == NULL)
523                 RETURN(ERR_PTR(-ENOMEM));
524
525         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
526         if (rc) {
527                 ptlrpc_request_free(req);
528                 RETURN(ERR_PTR(rc));
529         }
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532         ptlrpc_request_set_replen(req);
533         RETURN(req);
534 }
535
536 static int mdc_finish_enqueue(struct obd_export *exp,
537                               struct ptlrpc_request *req,
538                               struct ldlm_enqueue_info *einfo,
539                               struct lookup_intent *it,
540                               struct lustre_handle *lockh,
541                               int rc)
542 {
543         struct req_capsule  *pill = &req->rq_pill;
544         struct ldlm_request *lockreq;
545         struct ldlm_reply   *lockrep;
546         struct ldlm_lock    *lock;
547         void                *lvb_data = NULL;
548         __u32                lvb_len = 0;
549         ENTRY;
550
551         LASSERT(rc >= 0);
552         /* Similarly, if we're going to replay this request, we don't want to
553          * actually get a lock, just perform the intent. */
554         if (req->rq_transno || req->rq_replay) {
555                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
556                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
557         }
558
559         if (rc == ELDLM_LOCK_ABORTED) {
560                 einfo->ei_mode = 0;
561                 memset(lockh, 0, sizeof(*lockh));
562                 rc = 0;
563         } else { /* rc = 0 */
564                 lock = ldlm_handle2lock(lockh);
565                 LASSERT(lock != NULL);
566
567                 /* If the server gave us back a different lock mode, we should
568                  * fix up our variables. */
569                 if (lock->l_req_mode != einfo->ei_mode) {
570                         ldlm_lock_addref(lockh, lock->l_req_mode);
571                         ldlm_lock_decref(lockh, einfo->ei_mode);
572                         einfo->ei_mode = lock->l_req_mode;
573                 }
574                 LDLM_LOCK_PUT(lock);
575         }
576
577         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
578         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
579
580         it->it_disposition = (int)lockrep->lock_policy_res1;
581         it->it_status = (int)lockrep->lock_policy_res2;
582         it->it_lock_mode = einfo->ei_mode;
583         it->it_lock_handle = lockh->cookie;
584         it->it_request = req;
585
586         /* Technically speaking rq_transno must already be zero if
587          * it_status is in error, so the check is a bit redundant */
588         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
589                 mdc_clear_replay_flag(req, it->it_status);
590
591         /* If we're doing an IT_OPEN which did not result in an actual
592          * successful open, then we need to remove the bit which saves
593          * this request for unconditional replay.
594          *
595          * It's important that we do this first!  Otherwise we might exit the
596          * function without doing so, and try to replay a failed create
597          * (bug 3440) */
598         if (it->it_op & IT_OPEN && req->rq_replay &&
599             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
600                 mdc_clear_replay_flag(req, it->it_status);
601
602         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
603                   it->it_op, it->it_disposition, it->it_status);
604
605         /* We know what to expect, so we do any byte flipping required here */
606         if (it_has_reply_body(it)) {
607                 struct mdt_body *body;
608
609                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
610                 if (body == NULL) {
611                         CERROR ("Can't swab mdt_body\n");
612                         RETURN (-EPROTO);
613                 }
614
615                 if (it_disposition(it, DISP_OPEN_OPEN) &&
616                     !it_open_error(DISP_OPEN_OPEN, it)) {
617                         /*
618                          * If this is a successful OPEN request, we need to set
619                          * replay handler and data early, so that if replay
620                          * happens immediately after swabbing below, new reply
621                          * is swabbed by that handler correctly.
622                          */
623                         mdc_set_open_replay_data(NULL, NULL, it);
624                 }
625
626                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
627                         void *eadata;
628
629                         mdc_update_max_ea_from_body(exp, body);
630
631                         /*
632                          * The eadata is opaque; just check that it is there.
633                          * Eventually, obd_unpackmd() will check the contents.
634                          */
635                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636                                                         body->mbo_eadatasize);
637                         if (eadata == NULL)
638                                 RETURN(-EPROTO);
639
640                         /* save lvb data and length in case this is for layout
641                          * lock */
642                         lvb_data = eadata;
643                         lvb_len = body->mbo_eadatasize;
644
645                         /*
646                          * We save the reply LOV EA in case we have to replay a
647                          * create for recovery.  If we didn't allocate a large
648                          * enough request buffer above we need to reallocate it
649                          * here to hold the actual LOV EA.
650                          *
651                          * To not save LOV EA if request is not going to replay
652                          * (for example error one).
653                          */
654                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
655                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656                                                     body->mbo_eadatasize);
657                                 if (rc) {
658                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
659                                         body->mbo_eadatasize = 0;
660                                         rc = 0;
661                                 }
662                         }
663                 }
664         } else if (it->it_op & IT_LAYOUT) {
665                 /* maybe the lock was granted right away and layout
666                  * is packed into RMF_DLM_LVB of req */
667                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
668                 if (lvb_len > 0) {
669                         lvb_data = req_capsule_server_sized_get(pill,
670                                                         &RMF_DLM_LVB, lvb_len);
671                         if (lvb_data == NULL)
672                                 RETURN(-EPROTO);
673
674                         /**
675                          * save replied layout data to the request buffer for
676                          * recovery consideration (lest MDS reinitialize
677                          * another set of OST objects).
678                          */
679                         if (req->rq_transno)
680                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
681                                                      lvb_len);
682                 }
683         }
684
685         /* fill in stripe data for layout lock.
686          * LU-6581: trust layout data only if layout lock is granted. The MDT
687          * has stopped sending layout unless the layout lock is granted. The
688          * client still does this checking in case it's talking with an old
689          * server. - Jinshan */
690         lock = ldlm_handle2lock(lockh);
691         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
692             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
693                 void *lmm;
694
695                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
696                         ldlm_it2str(it->it_op), lvb_len);
697
698                 OBD_ALLOC_LARGE(lmm, lvb_len);
699                 if (lmm == NULL) {
700                         LDLM_LOCK_PUT(lock);
701                         RETURN(-ENOMEM);
702                 }
703                 memcpy(lmm, lvb_data, lvb_len);
704
705                 /* install lvb_data */
706                 lock_res_and_lock(lock);
707                 if (lock->l_lvb_data == NULL) {
708                         lock->l_lvb_type = LVB_T_LAYOUT;
709                         lock->l_lvb_data = lmm;
710                         lock->l_lvb_len = lvb_len;
711                         lmm = NULL;
712                 }
713                 unlock_res_and_lock(lock);
714                 if (lmm != NULL)
715                         OBD_FREE_LARGE(lmm, lvb_len);
716         }
717         if (lock != NULL)
718                 LDLM_LOCK_PUT(lock);
719
720         RETURN(rc);
721 }
722
723 /* We always reserve enough space in the reply packet for a stripe MD, because
724  * we don't know in advance the file type. */
725 static int mdc_enqueue_base(struct obd_export *exp,
726                             struct ldlm_enqueue_info *einfo,
727                             const union ldlm_policy_data *policy,
728                             struct lookup_intent *it,
729                             struct md_op_data *op_data,
730                             struct lustre_handle *lockh,
731                             __u64 extra_lock_flags)
732 {
733         struct obd_device *obddev = class_exp2obd(exp);
734         struct ptlrpc_request *req = NULL;
735         __u64 flags, saved_flags = extra_lock_flags;
736         struct ldlm_res_id res_id;
737         static const union ldlm_policy_data lookup_policy = {
738                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const union ldlm_policy_data update_policy = {
740                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const union ldlm_policy_data layout_policy = {
742                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         static const union ldlm_policy_data getxattr_policy = {
744                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
745         int generation, resends = 0;
746         struct ldlm_reply *lockrep;
747         enum lvb_type lvb_type = 0;
748         int rc;
749         ENTRY;
750
751         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
752                  einfo->ei_type);
753         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
754
755         if (it != NULL) {
756                 LASSERT(policy == NULL);
757
758                 saved_flags |= LDLM_FL_HAS_INTENT;
759                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
760                         policy = &update_policy;
761                 else if (it->it_op & IT_LAYOUT)
762                         policy = &layout_policy;
763                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
764                         policy = &getxattr_policy;
765                 else
766                         policy = &lookup_policy;
767         }
768
769         generation = obddev->u.cli.cl_import->imp_generation;
770 resend:
771         flags = saved_flags;
772         if (it == NULL) {
773                 /* The only way right now is FLOCK. */
774                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
775                          einfo->ei_type);
776                 res_id.name[3] = LDLM_FLOCK;
777         } else if (it->it_op & IT_OPEN) {
778                 req = mdc_intent_open_pack(exp, it, op_data);
779         } else if (it->it_op & IT_UNLINK) {
780                 req = mdc_intent_unlink_pack(exp, it, op_data);
781         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
782                 req = mdc_intent_getattr_pack(exp, it, op_data);
783         } else if (it->it_op & IT_READDIR) {
784                 req = mdc_enqueue_pack(exp, 0);
785         } else if (it->it_op & IT_LAYOUT) {
786                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
787                         RETURN(-EOPNOTSUPP);
788                 req = mdc_intent_layout_pack(exp, it, op_data);
789                 lvb_type = LVB_T_LAYOUT;
790         } else if (it->it_op & IT_GETXATTR) {
791                 req = mdc_intent_getxattr_pack(exp, it, op_data);
792         } else {
793                 LBUG();
794                 RETURN(-EINVAL);
795         }
796
797         if (IS_ERR(req))
798                 RETURN(PTR_ERR(req));
799
800         if (resends) {
801                 req->rq_generation_set = 1;
802                 req->rq_import_generation = generation;
803                 req->rq_sent = ktime_get_real_seconds() + resends;
804         }
805
806         /* It is important to obtain modify RPC slot first (if applicable), so
807          * that threads that are waiting for a modify RPC slot are not polluting
808          * our rpcs in flight counter.
809          * We do not do flock request limiting, though */
810         if (it) {
811                 mdc_get_mod_rpc_slot(req, it);
812                 rc = obd_get_request_slot(&obddev->u.cli);
813                 if (rc != 0) {
814                         mdc_put_mod_rpc_slot(req, it);
815                         mdc_clear_replay_flag(req, 0);
816                         ptlrpc_req_finished(req);
817                         RETURN(rc);
818                 }
819         }
820
821         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
822                               0, lvb_type, lockh, 0);
823         if (!it) {
824                 /* For flock requests we immediatelly return without further
825                    delay and let caller deal with the rest, since rest of
826                    this function metadata processing makes no sense for flock
827                    requests anyway. But in case of problem during comms with
828                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
829                    can not rely on caller and this mainly for F_UNLCKs
830                    (explicits or automatically generated by Kernel to clean
831                    current FLocks upon exit) that can't be trashed */
832                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
833                     (einfo->ei_type == LDLM_FLOCK) &&
834                     (einfo->ei_mode == LCK_NL))
835                         goto resend;
836                 RETURN(rc);
837         }
838
839         obd_put_request_slot(&obddev->u.cli);
840         mdc_put_mod_rpc_slot(req, it);
841
842         if (rc < 0) {
843                 CDEBUG(D_INFO,
844                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
845                       obddev->obd_name, PFID(&op_data->op_fid1),
846                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
847
848                 mdc_clear_replay_flag(req, rc);
849                 ptlrpc_req_finished(req);
850                 RETURN(rc);
851         }
852
853         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
854         LASSERT(lockrep != NULL);
855
856         lockrep->lock_policy_res2 =
857                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
858
859         /* Retry infinitely when the server returns -EINPROGRESS for the
860          * intent operation, when server returns -EINPROGRESS for acquiring
861          * intent lock, we'll retry in after_reply(). */
862         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
863                 mdc_clear_replay_flag(req, rc);
864                 ptlrpc_req_finished(req);
865                 if (generation == obddev->u.cli.cl_import->imp_generation) {
866                         if (signal_pending(current))
867                                 RETURN(-EINTR);
868
869                         resends++;
870                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
871                                obddev->obd_name, resends, it->it_op,
872                                PFID(&op_data->op_fid1),
873                                PFID(&op_data->op_fid2));
874                         goto resend;
875                 } else {
876                         CDEBUG(D_HA, "resend cross eviction\n");
877                         RETURN(-EIO);
878                 }
879         }
880
881         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
882         if (rc < 0) {
883                 if (lustre_handle_is_used(lockh)) {
884                         ldlm_lock_decref(lockh, einfo->ei_mode);
885                         memset(lockh, 0, sizeof(*lockh));
886                 }
887                 ptlrpc_req_finished(req);
888
889                 it->it_lock_handle = 0;
890                 it->it_lock_mode = 0;
891                 it->it_request = NULL;
892         }
893
894         RETURN(rc);
895 }
896
897 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
898                 const union ldlm_policy_data *policy,
899                 struct md_op_data *op_data,
900                 struct lustre_handle *lockh, __u64 extra_lock_flags)
901 {
902         return mdc_enqueue_base(exp, einfo, policy, NULL,
903                                 op_data, lockh, extra_lock_flags);
904 }
905
906 static int mdc_finish_intent_lock(struct obd_export *exp,
907                                   struct ptlrpc_request *request,
908                                   struct md_op_data *op_data,
909                                   struct lookup_intent *it,
910                                   struct lustre_handle *lockh)
911 {
912         struct lustre_handle old_lock;
913         struct ldlm_lock *lock;
914         int rc = 0;
915         ENTRY;
916
917         LASSERT(request != NULL);
918         LASSERT(request != LP_POISON);
919         LASSERT(request->rq_repmsg != LP_POISON);
920
921         if (it->it_op & IT_READDIR)
922                 RETURN(0);
923
924         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
925                 if (it->it_status != 0)
926                         GOTO(out, rc = it->it_status);
927         } else {
928                 if (!it_disposition(it, DISP_IT_EXECD)) {
929                         /* The server failed before it even started executing
930                          * the intent, i.e. because it couldn't unpack the
931                          * request.
932                          */
933                         LASSERT(it->it_status != 0);
934                         GOTO(out, rc = it->it_status);
935                 }
936                 rc = it_open_error(DISP_IT_EXECD, it);
937                 if (rc)
938                         GOTO(out, rc);
939
940                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
941                 if (rc)
942                         GOTO(out, rc);
943
944                 /* keep requests around for the multiple phases of the call
945                  * this shows the DISP_XX must guarantee we make it into the
946                  * call
947                  */
948                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
949                     it_disposition(it, DISP_OPEN_CREATE) &&
950                     !it_open_error(DISP_OPEN_CREATE, it)) {
951                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
952                         /* balanced in ll_create_node */
953                         ptlrpc_request_addref(request);
954                 }
955                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
956                     it_disposition(it, DISP_OPEN_OPEN) &&
957                     !it_open_error(DISP_OPEN_OPEN, it)) {
958                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
959                         /* balanced in ll_file_open */
960                         ptlrpc_request_addref(request);
961                         /* BUG 11546 - eviction in the middle of open rpc
962                          * processing
963                          */
964                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
965                                          obd_timeout);
966                 }
967
968                 if (it->it_op & IT_CREAT) {
969                         /* XXX this belongs in ll_create_it */
970                 } else if (it->it_op == IT_OPEN) {
971                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
972                 } else {
973                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
974                 }
975         }
976
977         /* If we already have a matching lock, then cancel the new
978          * one.  We have to set the data here instead of in
979          * mdc_enqueue, because we need to use the child's inode as
980          * the l_ast_data to match, and that's not available until
981          * intent_finish has performed the iget().) */
982         lock = ldlm_handle2lock(lockh);
983         if (lock) {
984                 union ldlm_policy_data policy = lock->l_policy_data;
985                 LDLM_DEBUG(lock, "matching against this");
986
987                 if (it_has_reply_body(it)) {
988                         struct mdt_body *body;
989
990                         body = req_capsule_server_get(&request->rq_pill,
991                                                       &RMF_MDT_BODY);
992                         /* mdc_enqueue checked */
993                         LASSERT(body != NULL);
994                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
995                                                  &lock->l_resource->lr_name),
996                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
997                                  PLDLMRES(lock->l_resource),
998                                  PFID(&body->mbo_fid1));
999                 }
1000                 LDLM_LOCK_PUT(lock);
1001
1002                 memcpy(&old_lock, lockh, sizeof(*lockh));
1003                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1004                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1005                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1006                         memcpy(lockh, &old_lock, sizeof(old_lock));
1007                         it->it_lock_handle = lockh->cookie;
1008                 }
1009         }
1010
1011         EXIT;
1012 out:
1013         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1014                 (int)op_data->op_namelen, op_data->op_name,
1015                 ldlm_it2str(it->it_op), it->it_status,
1016                 it->it_disposition, rc);
1017         return rc;
1018 }
1019
1020 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1021                         struct lu_fid *fid, __u64 *bits)
1022 {
1023         /* We could just return 1 immediately, but since we should only
1024          * be called in revalidate_it if we already have a lock, let's
1025          * verify that. */
1026         struct ldlm_res_id res_id;
1027         struct lustre_handle lockh;
1028         union ldlm_policy_data policy;
1029         enum ldlm_mode mode;
1030         ENTRY;
1031
1032         if (it->it_lock_handle) {
1033                 lockh.cookie = it->it_lock_handle;
1034                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1035         } else {
1036                 fid_build_reg_res_name(fid, &res_id);
1037                 switch (it->it_op) {
1038                 case IT_GETATTR:
1039                         /* File attributes are held under multiple bits:
1040                          * nlink is under lookup lock, size and times are
1041                          * under UPDATE lock and recently we've also got
1042                          * a separate permissions lock for owner/group/acl that
1043                          * were protected by lookup lock before.
1044                          * Getattr must provide all of that information,
1045                          * so we need to ensure we have all of those locks.
1046                          * Unfortunately, if the bits are split across multiple
1047                          * locks, there's no easy way to match all of them here,
1048                          * so an extra RPC would be performed to fetch all
1049                          * of those bits at once for now. */
1050                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1051                          * but for old MDTs (< 2.4), permission is covered
1052                          * by LOOKUP lock, so it needs to match all bits here.*/
1053                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1054                                                   MDS_INODELOCK_LOOKUP |
1055                                                   MDS_INODELOCK_PERM;
1056                         break;
1057                 case IT_READDIR:
1058                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1059                         break;
1060                 case IT_LAYOUT:
1061                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1062                         break;
1063                 default:
1064                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1065                         break;
1066                 }
1067
1068                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1069                                       LDLM_IBITS, &policy,
1070                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1071                                       &lockh);
1072         }
1073
1074         if (mode) {
1075                 it->it_lock_handle = lockh.cookie;
1076                 it->it_lock_mode = mode;
1077         } else {
1078                 it->it_lock_handle = 0;
1079                 it->it_lock_mode = 0;
1080         }
1081
1082         RETURN(!!mode);
1083 }
1084
1085 /*
1086  * This long block is all about fixing up the lock and request state
1087  * so that it is correct as of the moment _before_ the operation was
1088  * applied; that way, the VFS will think that everything is normal and
1089  * call Lustre's regular VFS methods.
1090  *
1091  * If we're performing a creation, that means that unless the creation
1092  * failed with EEXIST, we should fake up a negative dentry.
1093  *
1094  * For everything else, we want to lookup to succeed.
1095  *
1096  * One additional note: if CREATE or OPEN succeeded, we add an extra
1097  * reference to the request because we need to keep it around until
1098  * ll_create/ll_open gets called.
1099  *
1100  * The server will return to us, in it_disposition, an indication of
1101  * exactly what it_status refers to.
1102  *
1103  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1104  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1105  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1106  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1107  * was successful.
1108  *
1109  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1110  * child lookup.
1111  */
1112 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1113                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1114                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1115 {
1116         struct ldlm_enqueue_info einfo = {
1117                 .ei_type        = LDLM_IBITS,
1118                 .ei_mode        = it_to_lock_mode(it),
1119                 .ei_cb_bl       = cb_blocking,
1120                 .ei_cb_cp       = ldlm_completion_ast,
1121         };
1122         struct lustre_handle lockh;
1123         int rc = 0;
1124         ENTRY;
1125         LASSERT(it);
1126
1127         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1128                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1129                 op_data->op_name, PFID(&op_data->op_fid2),
1130                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1131                 it->it_flags);
1132
1133         lockh.cookie = 0;
1134         if (fid_is_sane(&op_data->op_fid2) &&
1135             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1136                 /* We could just return 1 immediately, but since we should only
1137                  * be called in revalidate_it if we already have a lock, let's
1138                  * verify that. */
1139                 it->it_lock_handle = 0;
1140                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1141                 /* Only return failure if it was not GETATTR by cfid
1142                    (from inode_revalidate) */
1143                 if (rc || op_data->op_namelen != 0)
1144                         RETURN(rc);
1145         }
1146
1147         /* For case if upper layer did not alloc fid, do it now. */
1148         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1149                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1150                 if (rc < 0) {
1151                         CERROR("Can't alloc new fid, rc %d\n", rc);
1152                         RETURN(rc);
1153                 }
1154         }
1155
1156         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1157                               extra_lock_flags);
1158         if (rc < 0)
1159                 RETURN(rc);
1160
1161         *reqp = it->it_request;
1162         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1163         RETURN(rc);
1164 }
1165
1166 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1167                                               struct ptlrpc_request *req,
1168                                               void *args, int rc)
1169 {
1170         struct mdc_getattr_args  *ga = args;
1171         struct obd_export        *exp = ga->ga_exp;
1172         struct md_enqueue_info   *minfo = ga->ga_minfo;
1173         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1174         struct lookup_intent     *it;
1175         struct lustre_handle     *lockh;
1176         struct obd_device        *obddev;
1177         struct ldlm_reply        *lockrep;
1178         __u64                     flags = LDLM_FL_HAS_INTENT;
1179         ENTRY;
1180
1181         it    = &minfo->mi_it;
1182         lockh = &minfo->mi_lockh;
1183
1184         obddev = class_exp2obd(exp);
1185
1186         obd_put_request_slot(&obddev->u.cli);
1187         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1188                 rc = -ETIMEDOUT;
1189
1190         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1191                                    &flags, NULL, 0, lockh, rc);
1192         if (rc < 0) {
1193                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1194                 mdc_clear_replay_flag(req, rc);
1195                 GOTO(out, rc);
1196         }
1197
1198         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1199         LASSERT(lockrep != NULL);
1200
1201         lockrep->lock_policy_res2 =
1202                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1203
1204         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1205         if (rc)
1206                 GOTO(out, rc);
1207
1208         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1209         EXIT;
1210
1211 out:
1212         minfo->mi_cb(req, minfo, rc);
1213         return 0;
1214 }
1215
1216 int mdc_intent_getattr_async(struct obd_export *exp,
1217                              struct md_enqueue_info *minfo)
1218 {
1219         struct md_op_data       *op_data = &minfo->mi_data;
1220         struct lookup_intent    *it = &minfo->mi_it;
1221         struct ptlrpc_request   *req;
1222         struct mdc_getattr_args *ga;
1223         struct obd_device       *obddev = class_exp2obd(exp);
1224         struct ldlm_res_id       res_id;
1225         union ldlm_policy_data policy = {
1226                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1227                                                  MDS_INODELOCK_UPDATE } };
1228         int                      rc = 0;
1229         __u64                    flags = LDLM_FL_HAS_INTENT;
1230         ENTRY;
1231
1232         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1233                 (int)op_data->op_namelen, op_data->op_name,
1234                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1235
1236         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1237         req = mdc_intent_getattr_pack(exp, it, op_data);
1238         if (IS_ERR(req))
1239                 RETURN(PTR_ERR(req));
1240
1241         rc = obd_get_request_slot(&obddev->u.cli);
1242         if (rc != 0) {
1243                 ptlrpc_req_finished(req);
1244                 RETURN(rc);
1245         }
1246
1247         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1248                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1249         if (rc < 0) {
1250                 obd_put_request_slot(&obddev->u.cli);
1251                 ptlrpc_req_finished(req);
1252                 RETURN(rc);
1253         }
1254
1255         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1256         ga = ptlrpc_req_async_args(req);
1257         ga->ga_exp = exp;
1258         ga->ga_minfo = minfo;
1259
1260         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1261         ptlrpcd_add_req(req);
1262
1263         RETURN(0);
1264 }