Whamcloud - gitweb
LU-10912 mdc: use large xattr buffers for old servers
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN  250
341 #define GA_DEFAULT_EA_NUM      10
342
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345                          struct lookup_intent *it,
346                          struct md_op_data *op_data)
347 {
348         struct ptlrpc_request   *req;
349         struct ldlm_intent      *lit;
350         int                     rc, count = 0;
351         struct list_head        cancels = LIST_HEAD_INIT(cancels);
352         u32 min_buf_size = 0;
353
354         ENTRY;
355
356         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
357                                         &RQF_LDLM_INTENT_GETXATTR);
358         if (req == NULL)
359                 RETURN(ERR_PTR(-ENOMEM));
360
361         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
362         if (rc) {
363                 ptlrpc_request_free(req);
364                 RETURN(ERR_PTR(rc));
365         }
366
367         /* pack the intent */
368         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
369         lit->opc = IT_GETXATTR;
370
371 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
372         /* If the supplied buffer is too small then the server will
373          * return -ERANGE and llite will fallback to using non cached
374          * xattr operations. On servers before 2.10.1 a (non-cached)
375          * listxattr RPC for an orphan or dead file causes an oops. So
376          * let's try to avoid sending too small a buffer to too old a
377          * server. This is effectively undoing the memory conservation
378          * of LU-9417 when it would be *more* likely to crash the
379          * server. See LU-9856. */
380         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
381                 min_buf_size = exp->exp_connect_data.ocd_max_easize;
382 #endif
383
384         /* pack the intended request */
385         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
386                       max_t(u32, min_buf_size,
387                           GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM),
388                       -1, 0);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
391                              max_t(u32, min_buf_size,
392                                  GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
393
394         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
395                              max_t(u32, min_buf_size,
396                                  GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
397
398         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
399                              max_t(u32, min_buf_size,
400                                  sizeof(__u32) * GA_DEFAULT_EA_NUM));
401
402         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
403
404         ptlrpc_request_set_replen(req);
405
406         RETURN(req);
407 }
408
409 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
410                                                      struct lookup_intent *it,
411                                                      struct md_op_data *op_data)
412 {
413         struct ptlrpc_request *req;
414         struct obd_device     *obddev = class_exp2obd(exp);
415         struct ldlm_intent    *lit;
416         int                    rc;
417         ENTRY;
418
419         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
420                                    &RQF_LDLM_INTENT_UNLINK);
421         if (req == NULL)
422                 RETURN(ERR_PTR(-ENOMEM));
423
424         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425                              op_data->op_namelen + 1);
426
427         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(ERR_PTR(rc));
431         }
432
433         /* pack the intent */
434         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435         lit->opc = (__u64)it->it_op;
436
437         /* pack the intended request */
438         mdc_unlink_pack(req, op_data);
439
440         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
441                              obddev->u.cli.cl_default_mds_easize);
442         ptlrpc_request_set_replen(req);
443         RETURN(req);
444 }
445
446 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
447                                                       struct lookup_intent *it,
448                                                       struct md_op_data *op_data)
449 {
450         struct ptlrpc_request   *req;
451         struct obd_device       *obddev = class_exp2obd(exp);
452         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
453                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
454                                          OBD_MD_MEA | OBD_MD_FLACL;
455         struct ldlm_intent      *lit;
456         int                      rc;
457         __u32                    easize;
458         ENTRY;
459
460         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461                                    &RQF_LDLM_INTENT_GETATTR);
462         if (req == NULL)
463                 RETURN(ERR_PTR(-ENOMEM));
464
465         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
466                              op_data->op_namelen + 1);
467
468         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
469         if (rc) {
470                 ptlrpc_request_free(req);
471                 RETURN(ERR_PTR(rc));
472         }
473
474         /* pack the intent */
475         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
476         lit->opc = (__u64)it->it_op;
477
478         if (obddev->u.cli.cl_default_mds_easize > 0)
479                 easize = obddev->u.cli.cl_default_mds_easize;
480         else
481                 easize = obddev->u.cli.cl_max_mds_easize;
482
483         /* pack the intended request */
484         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
485
486         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
487         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
488                              req->rq_import->imp_connect_data.ocd_max_easize);
489         ptlrpc_request_set_replen(req);
490         RETURN(req);
491 }
492
493 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
494                                                      struct lookup_intent *it,
495                                                      struct md_op_data *op_data)
496 {
497         struct obd_device     *obd = class_exp2obd(exp);
498         struct ptlrpc_request *req;
499         struct ldlm_intent    *lit;
500         struct layout_intent  *layout;
501         int rc;
502         ENTRY;
503
504         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
505                                 &RQF_LDLM_INTENT_LAYOUT);
506         if (req == NULL)
507                 RETURN(ERR_PTR(-ENOMEM));
508
509         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
510         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
511         if (rc) {
512                 ptlrpc_request_free(req);
513                 RETURN(ERR_PTR(rc));
514         }
515
516         /* pack the intent */
517         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
518         lit->opc = (__u64)it->it_op;
519
520         /* pack the layout intent request */
521         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
522         LASSERT(op_data->op_data != NULL);
523         LASSERT(op_data->op_data_size == sizeof(*layout));
524         memcpy(layout, op_data->op_data, sizeof(*layout));
525
526         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
527                              obd->u.cli.cl_default_mds_easize);
528         ptlrpc_request_set_replen(req);
529         RETURN(req);
530 }
531
532 static struct ptlrpc_request *
533 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
534 {
535         struct ptlrpc_request *req;
536         int rc;
537         ENTRY;
538
539         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
540         if (req == NULL)
541                 RETURN(ERR_PTR(-ENOMEM));
542
543         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
544         if (rc) {
545                 ptlrpc_request_free(req);
546                 RETURN(ERR_PTR(rc));
547         }
548
549         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
550         ptlrpc_request_set_replen(req);
551         RETURN(req);
552 }
553
554 static int mdc_finish_enqueue(struct obd_export *exp,
555                               struct ptlrpc_request *req,
556                               struct ldlm_enqueue_info *einfo,
557                               struct lookup_intent *it,
558                               struct lustre_handle *lockh,
559                               int rc)
560 {
561         struct req_capsule  *pill = &req->rq_pill;
562         struct ldlm_request *lockreq;
563         struct ldlm_reply   *lockrep;
564         struct ldlm_lock    *lock;
565         void                *lvb_data = NULL;
566         __u32                lvb_len = 0;
567         ENTRY;
568
569         LASSERT(rc >= 0);
570         /* Similarly, if we're going to replay this request, we don't want to
571          * actually get a lock, just perform the intent. */
572         if (req->rq_transno || req->rq_replay) {
573                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
574                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
575         }
576
577         if (rc == ELDLM_LOCK_ABORTED) {
578                 einfo->ei_mode = 0;
579                 memset(lockh, 0, sizeof(*lockh));
580                 rc = 0;
581         } else { /* rc = 0 */
582                 lock = ldlm_handle2lock(lockh);
583                 LASSERT(lock != NULL);
584
585                 /* If the server gave us back a different lock mode, we should
586                  * fix up our variables. */
587                 if (lock->l_req_mode != einfo->ei_mode) {
588                         ldlm_lock_addref(lockh, lock->l_req_mode);
589                         ldlm_lock_decref(lockh, einfo->ei_mode);
590                         einfo->ei_mode = lock->l_req_mode;
591                 }
592                 LDLM_LOCK_PUT(lock);
593         }
594
595         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
596         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
597
598         it->it_disposition = (int)lockrep->lock_policy_res1;
599         it->it_status = (int)lockrep->lock_policy_res2;
600         it->it_lock_mode = einfo->ei_mode;
601         it->it_lock_handle = lockh->cookie;
602         it->it_request = req;
603
604         /* Technically speaking rq_transno must already be zero if
605          * it_status is in error, so the check is a bit redundant */
606         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
607                 mdc_clear_replay_flag(req, it->it_status);
608
609         /* If we're doing an IT_OPEN which did not result in an actual
610          * successful open, then we need to remove the bit which saves
611          * this request for unconditional replay.
612          *
613          * It's important that we do this first!  Otherwise we might exit the
614          * function without doing so, and try to replay a failed create
615          * (bug 3440) */
616         if (it->it_op & IT_OPEN && req->rq_replay &&
617             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
618                 mdc_clear_replay_flag(req, it->it_status);
619
620         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
621                   it->it_op, it->it_disposition, it->it_status);
622
623         /* We know what to expect, so we do any byte flipping required here */
624         if (it_has_reply_body(it)) {
625                 struct mdt_body *body;
626
627                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
628                 if (body == NULL) {
629                         CERROR ("Can't swab mdt_body\n");
630                         RETURN (-EPROTO);
631                 }
632
633                 if (it_disposition(it, DISP_OPEN_OPEN) &&
634                     !it_open_error(DISP_OPEN_OPEN, it)) {
635                         /*
636                          * If this is a successful OPEN request, we need to set
637                          * replay handler and data early, so that if replay
638                          * happens immediately after swabbing below, new reply
639                          * is swabbed by that handler correctly.
640                          */
641                         mdc_set_open_replay_data(NULL, NULL, it);
642                 }
643
644                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
645                         void *eadata;
646
647                         mdc_update_max_ea_from_body(exp, body);
648
649                         /*
650                          * The eadata is opaque; just check that it is there.
651                          * Eventually, obd_unpackmd() will check the contents.
652                          */
653                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
654                                                         body->mbo_eadatasize);
655                         if (eadata == NULL)
656                                 RETURN(-EPROTO);
657
658                         /* save lvb data and length in case this is for layout
659                          * lock */
660                         lvb_data = eadata;
661                         lvb_len = body->mbo_eadatasize;
662
663                         /*
664                          * We save the reply LOV EA in case we have to replay a
665                          * create for recovery.  If we didn't allocate a large
666                          * enough request buffer above we need to reallocate it
667                          * here to hold the actual LOV EA.
668                          *
669                          * To not save LOV EA if request is not going to replay
670                          * (for example error one).
671                          */
672                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
673                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
674                                                     body->mbo_eadatasize);
675                                 if (rc) {
676                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
677                                         body->mbo_eadatasize = 0;
678                                         rc = 0;
679                                 }
680                         }
681                 }
682         } else if (it->it_op & IT_LAYOUT) {
683                 /* maybe the lock was granted right away and layout
684                  * is packed into RMF_DLM_LVB of req */
685                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
686                 if (lvb_len > 0) {
687                         lvb_data = req_capsule_server_sized_get(pill,
688                                                         &RMF_DLM_LVB, lvb_len);
689                         if (lvb_data == NULL)
690                                 RETURN(-EPROTO);
691
692                         /**
693                          * save replied layout data to the request buffer for
694                          * recovery consideration (lest MDS reinitialize
695                          * another set of OST objects).
696                          */
697                         if (req->rq_transno)
698                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
699                                                      lvb_len);
700                 }
701         }
702
703         /* fill in stripe data for layout lock.
704          * LU-6581: trust layout data only if layout lock is granted. The MDT
705          * has stopped sending layout unless the layout lock is granted. The
706          * client still does this checking in case it's talking with an old
707          * server. - Jinshan */
708         lock = ldlm_handle2lock(lockh);
709         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
710             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
711                 void *lmm;
712
713                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
714                         ldlm_it2str(it->it_op), lvb_len);
715
716                 OBD_ALLOC_LARGE(lmm, lvb_len);
717                 if (lmm == NULL) {
718                         LDLM_LOCK_PUT(lock);
719                         RETURN(-ENOMEM);
720                 }
721                 memcpy(lmm, lvb_data, lvb_len);
722
723                 /* install lvb_data */
724                 lock_res_and_lock(lock);
725                 if (lock->l_lvb_data == NULL) {
726                         lock->l_lvb_type = LVB_T_LAYOUT;
727                         lock->l_lvb_data = lmm;
728                         lock->l_lvb_len = lvb_len;
729                         lmm = NULL;
730                 }
731                 unlock_res_and_lock(lock);
732                 if (lmm != NULL)
733                         OBD_FREE_LARGE(lmm, lvb_len);
734         }
735         if (lock != NULL)
736                 LDLM_LOCK_PUT(lock);
737
738         RETURN(rc);
739 }
740
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742  * we don't know in advance the file type. */
743 static int mdc_enqueue_base(struct obd_export *exp,
744                             struct ldlm_enqueue_info *einfo,
745                             const union ldlm_policy_data *policy,
746                             struct lookup_intent *it,
747                             struct md_op_data *op_data,
748                             struct lustre_handle *lockh,
749                             __u64 extra_lock_flags)
750 {
751         struct obd_device *obddev = class_exp2obd(exp);
752         struct ptlrpc_request *req = NULL;
753         __u64 flags, saved_flags = extra_lock_flags;
754         struct ldlm_res_id res_id;
755         static const union ldlm_policy_data lookup_policy = {
756                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
757         static const union ldlm_policy_data update_policy = {
758                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
759         static const union ldlm_policy_data layout_policy = {
760                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
761         static const union ldlm_policy_data getxattr_policy = {
762                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
763         int generation, resends = 0;
764         struct ldlm_reply *lockrep;
765         enum lvb_type lvb_type = 0;
766         int rc;
767         ENTRY;
768
769         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
770                  einfo->ei_type);
771         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
772
773         if (it != NULL) {
774                 LASSERT(policy == NULL);
775
776                 saved_flags |= LDLM_FL_HAS_INTENT;
777                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778                         policy = &update_policy;
779                 else if (it->it_op & IT_LAYOUT)
780                         policy = &layout_policy;
781                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782                         policy = &getxattr_policy;
783                 else
784                         policy = &lookup_policy;
785         }
786
787         generation = obddev->u.cli.cl_import->imp_generation;
788 resend:
789         flags = saved_flags;
790         if (it == NULL) {
791                 /* The only way right now is FLOCK. */
792                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
793                          einfo->ei_type);
794                 res_id.name[3] = LDLM_FLOCK;
795         } else if (it->it_op & IT_OPEN) {
796                 req = mdc_intent_open_pack(exp, it, op_data);
797         } else if (it->it_op & IT_UNLINK) {
798                 req = mdc_intent_unlink_pack(exp, it, op_data);
799         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
800                 req = mdc_intent_getattr_pack(exp, it, op_data);
801         } else if (it->it_op & IT_READDIR) {
802                 req = mdc_enqueue_pack(exp, 0);
803         } else if (it->it_op & IT_LAYOUT) {
804                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
805                         RETURN(-EOPNOTSUPP);
806                 req = mdc_intent_layout_pack(exp, it, op_data);
807                 lvb_type = LVB_T_LAYOUT;
808         } else if (it->it_op & IT_GETXATTR) {
809                 req = mdc_intent_getxattr_pack(exp, it, op_data);
810         } else {
811                 LBUG();
812                 RETURN(-EINVAL);
813         }
814
815         if (IS_ERR(req))
816                 RETURN(PTR_ERR(req));
817
818         if (resends) {
819                 req->rq_generation_set = 1;
820                 req->rq_import_generation = generation;
821                 req->rq_sent = ktime_get_real_seconds() + resends;
822         }
823
824         /* It is important to obtain modify RPC slot first (if applicable), so
825          * that threads that are waiting for a modify RPC slot are not polluting
826          * our rpcs in flight counter.
827          * We do not do flock request limiting, though */
828         if (it) {
829                 mdc_get_mod_rpc_slot(req, it);
830                 rc = obd_get_request_slot(&obddev->u.cli);
831                 if (rc != 0) {
832                         mdc_put_mod_rpc_slot(req, it);
833                         mdc_clear_replay_flag(req, 0);
834                         ptlrpc_req_finished(req);
835                         RETURN(rc);
836                 }
837         }
838
839         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
840                               0, lvb_type, lockh, 0);
841         if (!it) {
842                 /* For flock requests we immediatelly return without further
843                    delay and let caller deal with the rest, since rest of
844                    this function metadata processing makes no sense for flock
845                    requests anyway. But in case of problem during comms with
846                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
847                    can not rely on caller and this mainly for F_UNLCKs
848                    (explicits or automatically generated by Kernel to clean
849                    current FLocks upon exit) that can't be trashed */
850                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
851                     (einfo->ei_type == LDLM_FLOCK) &&
852                     (einfo->ei_mode == LCK_NL))
853                         goto resend;
854                 RETURN(rc);
855         }
856
857         obd_put_request_slot(&obddev->u.cli);
858         mdc_put_mod_rpc_slot(req, it);
859
860         if (rc < 0) {
861                 CDEBUG(D_INFO,
862                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
863                       obddev->obd_name, PFID(&op_data->op_fid1),
864                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
865
866                 mdc_clear_replay_flag(req, rc);
867                 ptlrpc_req_finished(req);
868                 RETURN(rc);
869         }
870
871         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
872         LASSERT(lockrep != NULL);
873
874         lockrep->lock_policy_res2 =
875                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
876
877         /* Retry infinitely when the server returns -EINPROGRESS for the
878          * intent operation, when server returns -EINPROGRESS for acquiring
879          * intent lock, we'll retry in after_reply(). */
880         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
881                 mdc_clear_replay_flag(req, rc);
882                 ptlrpc_req_finished(req);
883                 if (generation == obddev->u.cli.cl_import->imp_generation) {
884                         if (signal_pending(current))
885                                 RETURN(-EINTR);
886
887                         resends++;
888                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
889                                obddev->obd_name, resends, it->it_op,
890                                PFID(&op_data->op_fid1),
891                                PFID(&op_data->op_fid2));
892                         goto resend;
893                 } else {
894                         CDEBUG(D_HA, "resend cross eviction\n");
895                         RETURN(-EIO);
896                 }
897         }
898
899         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
900         if (rc < 0) {
901                 if (lustre_handle_is_used(lockh)) {
902                         ldlm_lock_decref(lockh, einfo->ei_mode);
903                         memset(lockh, 0, sizeof(*lockh));
904                 }
905                 ptlrpc_req_finished(req);
906
907                 it->it_lock_handle = 0;
908                 it->it_lock_mode = 0;
909                 it->it_request = NULL;
910         }
911
912         RETURN(rc);
913 }
914
915 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
916                 const union ldlm_policy_data *policy,
917                 struct md_op_data *op_data,
918                 struct lustre_handle *lockh, __u64 extra_lock_flags)
919 {
920         return mdc_enqueue_base(exp, einfo, policy, NULL,
921                                 op_data, lockh, extra_lock_flags);
922 }
923
924 static int mdc_finish_intent_lock(struct obd_export *exp,
925                                   struct ptlrpc_request *request,
926                                   struct md_op_data *op_data,
927                                   struct lookup_intent *it,
928                                   struct lustre_handle *lockh)
929 {
930         struct lustre_handle old_lock;
931         struct ldlm_lock *lock;
932         int rc = 0;
933         ENTRY;
934
935         LASSERT(request != NULL);
936         LASSERT(request != LP_POISON);
937         LASSERT(request->rq_repmsg != LP_POISON);
938
939         if (it->it_op & IT_READDIR)
940                 RETURN(0);
941
942         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
943                 if (it->it_status != 0)
944                         GOTO(out, rc = it->it_status);
945         } else {
946                 if (!it_disposition(it, DISP_IT_EXECD)) {
947                         /* The server failed before it even started executing
948                          * the intent, i.e. because it couldn't unpack the
949                          * request.
950                          */
951                         LASSERT(it->it_status != 0);
952                         GOTO(out, rc = it->it_status);
953                 }
954                 rc = it_open_error(DISP_IT_EXECD, it);
955                 if (rc)
956                         GOTO(out, rc);
957
958                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
959                 if (rc)
960                         GOTO(out, rc);
961
962                 /* keep requests around for the multiple phases of the call
963                  * this shows the DISP_XX must guarantee we make it into the
964                  * call
965                  */
966                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
967                     it_disposition(it, DISP_OPEN_CREATE) &&
968                     !it_open_error(DISP_OPEN_CREATE, it)) {
969                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
970                         /* balanced in ll_create_node */
971                         ptlrpc_request_addref(request);
972                 }
973                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
974                     it_disposition(it, DISP_OPEN_OPEN) &&
975                     !it_open_error(DISP_OPEN_OPEN, it)) {
976                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
977                         /* balanced in ll_file_open */
978                         ptlrpc_request_addref(request);
979                         /* BUG 11546 - eviction in the middle of open rpc
980                          * processing
981                          */
982                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
983                                          obd_timeout);
984                 }
985
986                 if (it->it_op & IT_CREAT) {
987                         /* XXX this belongs in ll_create_it */
988                 } else if (it->it_op == IT_OPEN) {
989                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
990                 } else {
991                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
992                 }
993         }
994
995         /* If we already have a matching lock, then cancel the new
996          * one.  We have to set the data here instead of in
997          * mdc_enqueue, because we need to use the child's inode as
998          * the l_ast_data to match, and that's not available until
999          * intent_finish has performed the iget().) */
1000         lock = ldlm_handle2lock(lockh);
1001         if (lock) {
1002                 union ldlm_policy_data policy = lock->l_policy_data;
1003                 LDLM_DEBUG(lock, "matching against this");
1004
1005                 if (it_has_reply_body(it)) {
1006                         struct mdt_body *body;
1007
1008                         body = req_capsule_server_get(&request->rq_pill,
1009                                                       &RMF_MDT_BODY);
1010                         /* mdc_enqueue checked */
1011                         LASSERT(body != NULL);
1012                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1013                                                  &lock->l_resource->lr_name),
1014                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1015                                  PLDLMRES(lock->l_resource),
1016                                  PFID(&body->mbo_fid1));
1017                 }
1018                 LDLM_LOCK_PUT(lock);
1019
1020                 memcpy(&old_lock, lockh, sizeof(*lockh));
1021                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1022                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1023                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1024                         memcpy(lockh, &old_lock, sizeof(old_lock));
1025                         it->it_lock_handle = lockh->cookie;
1026                 }
1027         }
1028
1029         EXIT;
1030 out:
1031         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1032                 (int)op_data->op_namelen, op_data->op_name,
1033                 ldlm_it2str(it->it_op), it->it_status,
1034                 it->it_disposition, rc);
1035         return rc;
1036 }
1037
1038 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1039                         struct lu_fid *fid, __u64 *bits)
1040 {
1041         /* We could just return 1 immediately, but since we should only
1042          * be called in revalidate_it if we already have a lock, let's
1043          * verify that. */
1044         struct ldlm_res_id res_id;
1045         struct lustre_handle lockh;
1046         union ldlm_policy_data policy;
1047         enum ldlm_mode mode;
1048         ENTRY;
1049
1050         if (it->it_lock_handle) {
1051                 lockh.cookie = it->it_lock_handle;
1052                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1053         } else {
1054                 fid_build_reg_res_name(fid, &res_id);
1055                 switch (it->it_op) {
1056                 case IT_GETATTR:
1057                         /* File attributes are held under multiple bits:
1058                          * nlink is under lookup lock, size and times are
1059                          * under UPDATE lock and recently we've also got
1060                          * a separate permissions lock for owner/group/acl that
1061                          * were protected by lookup lock before.
1062                          * Getattr must provide all of that information,
1063                          * so we need to ensure we have all of those locks.
1064                          * Unfortunately, if the bits are split across multiple
1065                          * locks, there's no easy way to match all of them here,
1066                          * so an extra RPC would be performed to fetch all
1067                          * of those bits at once for now. */
1068                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1069                          * but for old MDTs (< 2.4), permission is covered
1070                          * by LOOKUP lock, so it needs to match all bits here.*/
1071                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1072                                                   MDS_INODELOCK_LOOKUP |
1073                                                   MDS_INODELOCK_PERM;
1074                         break;
1075                 case IT_READDIR:
1076                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1077                         break;
1078                 case IT_LAYOUT:
1079                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1080                         break;
1081                 default:
1082                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1083                         break;
1084                 }
1085
1086                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1087                                       LDLM_IBITS, &policy,
1088                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1089                                       &lockh);
1090         }
1091
1092         if (mode) {
1093                 it->it_lock_handle = lockh.cookie;
1094                 it->it_lock_mode = mode;
1095         } else {
1096                 it->it_lock_handle = 0;
1097                 it->it_lock_mode = 0;
1098         }
1099
1100         RETURN(!!mode);
1101 }
1102
1103 /*
1104  * This long block is all about fixing up the lock and request state
1105  * so that it is correct as of the moment _before_ the operation was
1106  * applied; that way, the VFS will think that everything is normal and
1107  * call Lustre's regular VFS methods.
1108  *
1109  * If we're performing a creation, that means that unless the creation
1110  * failed with EEXIST, we should fake up a negative dentry.
1111  *
1112  * For everything else, we want to lookup to succeed.
1113  *
1114  * One additional note: if CREATE or OPEN succeeded, we add an extra
1115  * reference to the request because we need to keep it around until
1116  * ll_create/ll_open gets called.
1117  *
1118  * The server will return to us, in it_disposition, an indication of
1119  * exactly what it_status refers to.
1120  *
1121  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1122  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1123  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1124  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1125  * was successful.
1126  *
1127  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1128  * child lookup.
1129  */
1130 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1131                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1132                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1133 {
1134         struct ldlm_enqueue_info einfo = {
1135                 .ei_type        = LDLM_IBITS,
1136                 .ei_mode        = it_to_lock_mode(it),
1137                 .ei_cb_bl       = cb_blocking,
1138                 .ei_cb_cp       = ldlm_completion_ast,
1139         };
1140         struct lustre_handle lockh;
1141         int rc = 0;
1142         ENTRY;
1143         LASSERT(it);
1144
1145         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1146                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1147                 op_data->op_name, PFID(&op_data->op_fid2),
1148                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1149                 it->it_flags);
1150
1151         lockh.cookie = 0;
1152         if (fid_is_sane(&op_data->op_fid2) &&
1153             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1154                 /* We could just return 1 immediately, but since we should only
1155                  * be called in revalidate_it if we already have a lock, let's
1156                  * verify that. */
1157                 it->it_lock_handle = 0;
1158                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1159                 /* Only return failure if it was not GETATTR by cfid
1160                    (from inode_revalidate) */
1161                 if (rc || op_data->op_namelen != 0)
1162                         RETURN(rc);
1163         }
1164
1165         /* For case if upper layer did not alloc fid, do it now. */
1166         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1167                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1168                 if (rc < 0) {
1169                         CERROR("Can't alloc new fid, rc %d\n", rc);
1170                         RETURN(rc);
1171                 }
1172         }
1173
1174         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1175                               extra_lock_flags);
1176         if (rc < 0)
1177                 RETURN(rc);
1178
1179         *reqp = it->it_request;
1180         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1181         RETURN(rc);
1182 }
1183
1184 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1185                                               struct ptlrpc_request *req,
1186                                               void *args, int rc)
1187 {
1188         struct mdc_getattr_args  *ga = args;
1189         struct obd_export        *exp = ga->ga_exp;
1190         struct md_enqueue_info   *minfo = ga->ga_minfo;
1191         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1192         struct lookup_intent     *it;
1193         struct lustre_handle     *lockh;
1194         struct obd_device        *obddev;
1195         struct ldlm_reply        *lockrep;
1196         __u64                     flags = LDLM_FL_HAS_INTENT;
1197         ENTRY;
1198
1199         it    = &minfo->mi_it;
1200         lockh = &minfo->mi_lockh;
1201
1202         obddev = class_exp2obd(exp);
1203
1204         obd_put_request_slot(&obddev->u.cli);
1205         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1206                 rc = -ETIMEDOUT;
1207
1208         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1209                                    &flags, NULL, 0, lockh, rc);
1210         if (rc < 0) {
1211                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1212                 mdc_clear_replay_flag(req, rc);
1213                 GOTO(out, rc);
1214         }
1215
1216         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1217         LASSERT(lockrep != NULL);
1218
1219         lockrep->lock_policy_res2 =
1220                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1221
1222         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1223         if (rc)
1224                 GOTO(out, rc);
1225
1226         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1227         EXIT;
1228
1229 out:
1230         minfo->mi_cb(req, minfo, rc);
1231         return 0;
1232 }
1233
1234 int mdc_intent_getattr_async(struct obd_export *exp,
1235                              struct md_enqueue_info *minfo)
1236 {
1237         struct md_op_data       *op_data = &minfo->mi_data;
1238         struct lookup_intent    *it = &minfo->mi_it;
1239         struct ptlrpc_request   *req;
1240         struct mdc_getattr_args *ga;
1241         struct obd_device       *obddev = class_exp2obd(exp);
1242         struct ldlm_res_id       res_id;
1243         union ldlm_policy_data policy = {
1244                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1245                                                  MDS_INODELOCK_UPDATE } };
1246         int                      rc = 0;
1247         __u64                    flags = LDLM_FL_HAS_INTENT;
1248         ENTRY;
1249
1250         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1251                 (int)op_data->op_namelen, op_data->op_name,
1252                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1253
1254         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1255         req = mdc_intent_getattr_pack(exp, it, op_data);
1256         if (IS_ERR(req))
1257                 RETURN(PTR_ERR(req));
1258
1259         rc = obd_get_request_slot(&obddev->u.cli);
1260         if (rc != 0) {
1261                 ptlrpc_req_finished(req);
1262                 RETURN(rc);
1263         }
1264
1265         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1266                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1267         if (rc < 0) {
1268                 obd_put_request_slot(&obddev->u.cli);
1269                 ptlrpc_req_finished(req);
1270                 RETURN(rc);
1271         }
1272
1273         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1274         ga = ptlrpc_req_async_args(req);
1275         ga->ga_exp = exp;
1276         ga->ga_minfo = minfo;
1277
1278         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1279         ptlrpcd_add_req(req);
1280
1281         RETURN(0);
1282 }