Whamcloud - gitweb
LU-10155 recovery: support setstripe replay
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         if (cl_is_lov_delay_create(it->it_flags)) {
304                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
305                 LASSERT(lmmsize == 0);
306                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
307         } else {
308                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
309                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
310         }
311
312         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
313                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
314                              strlen(op_data->op_file_secctx_name) + 1 : 0);
315
316         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
317                              op_data->op_file_secctx_size);
318
319         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
320         if (rc < 0) {
321                 ptlrpc_request_free(req);
322                 RETURN(ERR_PTR(rc));
323         }
324
325         spin_lock(&req->rq_lock);
326         req->rq_replay = req->rq_import->imp_replayable;
327         spin_unlock(&req->rq_lock);
328
329         /* pack the intent */
330         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
331         lit->opc = (__u64)it->it_op;
332
333         /* pack the intended request */
334         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
335                       lmmsize);
336
337         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
338                              obddev->u.cli.cl_max_mds_easize);
339         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
340                              req->rq_import->imp_connect_data.ocd_max_easize);
341         ptlrpc_request_set_replen(req);
342         return req;
343 }
344
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN  250
347 #define GA_DEFAULT_EA_NUM      10
348
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351                          struct lookup_intent *it,
352                          struct md_op_data *op_data)
353 {
354         struct ptlrpc_request   *req;
355         struct ldlm_intent      *lit;
356         int                     rc, count = 0;
357         struct list_head        cancels = LIST_HEAD_INIT(cancels);
358
359         ENTRY;
360
361         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362                                         &RQF_LDLM_INTENT_GETXATTR);
363         if (req == NULL)
364                 RETURN(ERR_PTR(-ENOMEM));
365
366         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
367         if (rc) {
368                 ptlrpc_request_free(req);
369                 RETURN(ERR_PTR(rc));
370         }
371
372         /* pack the intent */
373         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374         lit->opc = IT_GETXATTR;
375
376         /* pack the intended request */
377         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
378                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
379                       -1, 0);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
382                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
385                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
386
387         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
388                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
391
392         ptlrpc_request_set_replen(req);
393
394         RETURN(req);
395 }
396
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398                                                      struct lookup_intent *it,
399                                                      struct md_op_data *op_data)
400 {
401         struct ptlrpc_request *req;
402         struct obd_device     *obddev = class_exp2obd(exp);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_UNLINK);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413                              op_data->op_namelen + 1);
414
415         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
416         if (rc) {
417                 ptlrpc_request_free(req);
418                 RETURN(ERR_PTR(rc));
419         }
420
421         /* pack the intent */
422         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423         lit->opc = (__u64)it->it_op;
424
425         /* pack the intended request */
426         mdc_unlink_pack(req, op_data);
427
428         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429                              obddev->u.cli.cl_default_mds_easize);
430         ptlrpc_request_set_replen(req);
431         RETURN(req);
432 }
433
434 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
435                                                       struct lookup_intent *it,
436                                                       struct md_op_data *op_data)
437 {
438         struct ptlrpc_request   *req;
439         struct obd_device       *obddev = class_exp2obd(exp);
440         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442                                          OBD_MD_MEA | OBD_MD_FLACL;
443         struct ldlm_intent      *lit;
444         int                      rc;
445         __u32                    easize;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
449                                    &RQF_LDLM_INTENT_GETATTR);
450         if (req == NULL)
451                 RETURN(ERR_PTR(-ENOMEM));
452
453         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
454                              op_data->op_namelen + 1);
455
456         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457         if (rc) {
458                 ptlrpc_request_free(req);
459                 RETURN(ERR_PTR(rc));
460         }
461
462         /* pack the intent */
463         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464         lit->opc = (__u64)it->it_op;
465
466         if (obddev->u.cli.cl_default_mds_easize > 0)
467                 easize = obddev->u.cli.cl_default_mds_easize;
468         else
469                 easize = obddev->u.cli.cl_max_mds_easize;
470
471         /* pack the intended request */
472         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
473
474         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
475         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
476                              req->rq_import->imp_connect_data.ocd_max_easize);
477         ptlrpc_request_set_replen(req);
478         RETURN(req);
479 }
480
481 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
482                                                      struct lookup_intent *it,
483                                                      struct md_op_data *op_data)
484 {
485         struct obd_device     *obd = class_exp2obd(exp);
486         struct ptlrpc_request *req;
487         struct ldlm_intent    *lit;
488         struct layout_intent  *layout;
489         int rc;
490         ENTRY;
491
492         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493                                 &RQF_LDLM_INTENT_LAYOUT);
494         if (req == NULL)
495                 RETURN(ERR_PTR(-ENOMEM));
496
497         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
498         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
499         if (rc) {
500                 ptlrpc_request_free(req);
501                 RETURN(ERR_PTR(rc));
502         }
503
504         /* pack the intent */
505         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
506         lit->opc = (__u64)it->it_op;
507
508         /* pack the layout intent request */
509         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
510         LASSERT(op_data->op_data != NULL);
511         LASSERT(op_data->op_data_size == sizeof(*layout));
512         memcpy(layout, op_data->op_data, sizeof(*layout));
513
514         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
515                              obd->u.cli.cl_default_mds_easize);
516         ptlrpc_request_set_replen(req);
517         RETURN(req);
518 }
519
520 static struct ptlrpc_request *
521 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
522 {
523         struct ptlrpc_request *req;
524         int rc;
525         ENTRY;
526
527         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
528         if (req == NULL)
529                 RETURN(ERR_PTR(-ENOMEM));
530
531         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(ERR_PTR(rc));
535         }
536
537         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
538         ptlrpc_request_set_replen(req);
539         RETURN(req);
540 }
541
542 static int mdc_finish_enqueue(struct obd_export *exp,
543                               struct ptlrpc_request *req,
544                               struct ldlm_enqueue_info *einfo,
545                               struct lookup_intent *it,
546                               struct lustre_handle *lockh,
547                               int rc)
548 {
549         struct req_capsule  *pill = &req->rq_pill;
550         struct ldlm_request *lockreq;
551         struct ldlm_reply   *lockrep;
552         struct ldlm_lock    *lock;
553         struct mdt_body     *body = NULL;
554         void                *lvb_data = NULL;
555         __u32                lvb_len = 0;
556
557         ENTRY;
558
559         LASSERT(rc >= 0);
560         /* Similarly, if we're going to replay this request, we don't want to
561          * actually get a lock, just perform the intent. */
562         if (req->rq_transno || req->rq_replay) {
563                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
564                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
565         }
566
567         if (rc == ELDLM_LOCK_ABORTED) {
568                 einfo->ei_mode = 0;
569                 memset(lockh, 0, sizeof(*lockh));
570                 rc = 0;
571         } else { /* rc = 0 */
572                 lock = ldlm_handle2lock(lockh);
573                 LASSERT(lock != NULL);
574
575                 /* If the server gave us back a different lock mode, we should
576                  * fix up our variables. */
577                 if (lock->l_req_mode != einfo->ei_mode) {
578                         ldlm_lock_addref(lockh, lock->l_req_mode);
579                         ldlm_lock_decref(lockh, einfo->ei_mode);
580                         einfo->ei_mode = lock->l_req_mode;
581                 }
582                 LDLM_LOCK_PUT(lock);
583         }
584
585         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
586         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
587
588         it->it_disposition = (int)lockrep->lock_policy_res1;
589         it->it_status = (int)lockrep->lock_policy_res2;
590         it->it_lock_mode = einfo->ei_mode;
591         it->it_lock_handle = lockh->cookie;
592         it->it_request = req;
593
594         /* Technically speaking rq_transno must already be zero if
595          * it_status is in error, so the check is a bit redundant */
596         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
597                 mdc_clear_replay_flag(req, it->it_status);
598
599         /* If we're doing an IT_OPEN which did not result in an actual
600          * successful open, then we need to remove the bit which saves
601          * this request for unconditional replay.
602          *
603          * It's important that we do this first!  Otherwise we might exit the
604          * function without doing so, and try to replay a failed create
605          * (bug 3440) */
606         if (it->it_op & IT_OPEN && req->rq_replay &&
607             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
608                 mdc_clear_replay_flag(req, it->it_status);
609
610         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
611                   it->it_op, it->it_disposition, it->it_status);
612
613         /* We know what to expect, so we do any byte flipping required here */
614         if (it_has_reply_body(it)) {
615                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
616                 if (body == NULL) {
617                         CERROR ("Can't swab mdt_body\n");
618                         RETURN (-EPROTO);
619                 }
620
621                 if (it_disposition(it, DISP_OPEN_OPEN) &&
622                     !it_open_error(DISP_OPEN_OPEN, it)) {
623                         /*
624                          * If this is a successful OPEN request, we need to set
625                          * replay handler and data early, so that if replay
626                          * happens immediately after swabbing below, new reply
627                          * is swabbed by that handler correctly.
628                          */
629                         mdc_set_open_replay_data(NULL, NULL, it);
630                 }
631
632                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
633                         void *eadata;
634
635                         mdc_update_max_ea_from_body(exp, body);
636
637                         /*
638                          * The eadata is opaque; just check that it is there.
639                          * Eventually, obd_unpackmd() will check the contents.
640                          */
641                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
642                                                         body->mbo_eadatasize);
643                         if (eadata == NULL)
644                                 RETURN(-EPROTO);
645
646                         /* save lvb data and length in case this is for layout
647                          * lock */
648                         lvb_data = eadata;
649                         lvb_len = body->mbo_eadatasize;
650
651                         /*
652                          * We save the reply LOV EA in case we have to replay a
653                          * create for recovery.  If we didn't allocate a large
654                          * enough request buffer above we need to reallocate it
655                          * here to hold the actual LOV EA.
656                          *
657                          * To not save LOV EA if request is not going to replay
658                          * (for example error one).
659                          */
660                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
661                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
662                                                     body->mbo_eadatasize);
663                                 if (rc) {
664                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
665                                         body->mbo_eadatasize = 0;
666                                         rc = 0;
667                                 }
668                         }
669                 }
670         } else if (it->it_op & IT_LAYOUT) {
671                 /* maybe the lock was granted right away and layout
672                  * is packed into RMF_DLM_LVB of req */
673                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
674                 if (lvb_len > 0) {
675                         lvb_data = req_capsule_server_sized_get(pill,
676                                                         &RMF_DLM_LVB, lvb_len);
677                         if (lvb_data == NULL)
678                                 RETURN(-EPROTO);
679
680                         /**
681                          * save replied layout data to the request buffer for
682                          * recovery consideration (lest MDS reinitialize
683                          * another set of OST objects).
684                          */
685                         if (req->rq_transno)
686                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
687                                                      lvb_len);
688                 }
689         }
690
691         /* fill in stripe data for layout lock.
692          * LU-6581: trust layout data only if layout lock is granted. The MDT
693          * has stopped sending layout unless the layout lock is granted. The
694          * client still does this checking in case it's talking with an old
695          * server. - Jinshan */
696         lock = ldlm_handle2lock(lockh);
697         if (lock == NULL)
698                 RETURN(rc);
699
700         if (ldlm_has_layout(lock) && lvb_data != NULL &&
701             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
702                 void *lmm;
703
704                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
705                         ldlm_it2str(it->it_op), lvb_len);
706
707                 OBD_ALLOC_LARGE(lmm, lvb_len);
708                 if (lmm == NULL)
709                         GOTO(out_lock, rc = -ENOMEM);
710
711                 memcpy(lmm, lvb_data, lvb_len);
712
713                 /* install lvb_data */
714                 lock_res_and_lock(lock);
715                 if (lock->l_lvb_data == NULL) {
716                         lock->l_lvb_type = LVB_T_LAYOUT;
717                         lock->l_lvb_data = lmm;
718                         lock->l_lvb_len = lvb_len;
719                         lmm = NULL;
720                 }
721                 unlock_res_and_lock(lock);
722                 if (lmm != NULL)
723                         OBD_FREE_LARGE(lmm, lvb_len);
724         }
725
726         if (ldlm_has_dom(lock)) {
727                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
728
729                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
730                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
731                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
732                                    exp->exp_obd->obd_name);
733                         GOTO(out_lock, rc = -EPROTO);
734                 }
735
736                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
737                            ldlm_it2str(it->it_op), body->mbo_dom_size);
738
739                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
740         }
741 out_lock:
742         LDLM_LOCK_PUT(lock);
743
744         RETURN(rc);
745 }
746
747 /* We always reserve enough space in the reply packet for a stripe MD, because
748  * we don't know in advance the file type. */
749 static int mdc_enqueue_base(struct obd_export *exp,
750                             struct ldlm_enqueue_info *einfo,
751                             const union ldlm_policy_data *policy,
752                             struct lookup_intent *it,
753                             struct md_op_data *op_data,
754                             struct lustre_handle *lockh,
755                             __u64 extra_lock_flags)
756 {
757         struct obd_device *obddev = class_exp2obd(exp);
758         struct ptlrpc_request *req = NULL;
759         __u64 flags, saved_flags = extra_lock_flags;
760         struct ldlm_res_id res_id;
761         static const union ldlm_policy_data lookup_policy = {
762                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
763         static const union ldlm_policy_data update_policy = {
764                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
765         static const union ldlm_policy_data layout_policy = {
766                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
767         static const union ldlm_policy_data getxattr_policy = {
768                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
769         int generation, resends = 0;
770         struct ldlm_reply *lockrep;
771         enum lvb_type lvb_type = 0;
772         int rc;
773         ENTRY;
774
775         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
776                  einfo->ei_type);
777         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
778
779         if (it != NULL) {
780                 LASSERT(policy == NULL);
781
782                 saved_flags |= LDLM_FL_HAS_INTENT;
783                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
784                         policy = &update_policy;
785                 else if (it->it_op & IT_LAYOUT)
786                         policy = &layout_policy;
787                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
788                         policy = &getxattr_policy;
789                 else
790                         policy = &lookup_policy;
791         }
792
793         generation = obddev->u.cli.cl_import->imp_generation;
794 resend:
795         flags = saved_flags;
796         if (it == NULL) {
797                 /* The only way right now is FLOCK. */
798                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
799                          einfo->ei_type);
800                 res_id.name[3] = LDLM_FLOCK;
801         } else if (it->it_op & IT_OPEN) {
802                 req = mdc_intent_open_pack(exp, it, op_data);
803         } else if (it->it_op & IT_UNLINK) {
804                 req = mdc_intent_unlink_pack(exp, it, op_data);
805         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
806                 req = mdc_intent_getattr_pack(exp, it, op_data);
807         } else if (it->it_op & IT_READDIR) {
808                 req = mdc_enqueue_pack(exp, 0);
809         } else if (it->it_op & IT_LAYOUT) {
810                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
811                         RETURN(-EOPNOTSUPP);
812                 req = mdc_intent_layout_pack(exp, it, op_data);
813                 lvb_type = LVB_T_LAYOUT;
814         } else if (it->it_op & IT_GETXATTR) {
815                 req = mdc_intent_getxattr_pack(exp, it, op_data);
816         } else {
817                 LBUG();
818                 RETURN(-EINVAL);
819         }
820
821         if (IS_ERR(req))
822                 RETURN(PTR_ERR(req));
823
824         if (resends) {
825                 req->rq_generation_set = 1;
826                 req->rq_import_generation = generation;
827                 req->rq_sent = ktime_get_real_seconds() + resends;
828         }
829
830         /* It is important to obtain modify RPC slot first (if applicable), so
831          * that threads that are waiting for a modify RPC slot are not polluting
832          * our rpcs in flight counter.
833          * We do not do flock request limiting, though */
834         if (it) {
835                 mdc_get_mod_rpc_slot(req, it);
836                 rc = obd_get_request_slot(&obddev->u.cli);
837                 if (rc != 0) {
838                         mdc_put_mod_rpc_slot(req, it);
839                         mdc_clear_replay_flag(req, 0);
840                         ptlrpc_req_finished(req);
841                         RETURN(rc);
842                 }
843         }
844
845         /* With Data-on-MDT the glimpse callback is needed too.
846          * It is set here in advance but not in mdc_finish_enqueue()
847          * to avoid possible races. It is safe to have glimpse handler
848          * for non-DOM locks and costs nothing.*/
849         if (einfo->ei_cb_gl == NULL)
850                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
851
852         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
853                               0, lvb_type, lockh, 0);
854         if (!it) {
855                 /* For flock requests we immediatelly return without further
856                    delay and let caller deal with the rest, since rest of
857                    this function metadata processing makes no sense for flock
858                    requests anyway. But in case of problem during comms with
859                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
860                    can not rely on caller and this mainly for F_UNLCKs
861                    (explicits or automatically generated by Kernel to clean
862                    current FLocks upon exit) that can't be trashed */
863                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
864                     (einfo->ei_type == LDLM_FLOCK) &&
865                     (einfo->ei_mode == LCK_NL))
866                         goto resend;
867                 RETURN(rc);
868         }
869
870         obd_put_request_slot(&obddev->u.cli);
871         mdc_put_mod_rpc_slot(req, it);
872
873         if (rc < 0) {
874                 CDEBUG(D_INFO,
875                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
876                       obddev->obd_name, PFID(&op_data->op_fid1),
877                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
878
879                 mdc_clear_replay_flag(req, rc);
880                 ptlrpc_req_finished(req);
881                 RETURN(rc);
882         }
883
884         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
885         LASSERT(lockrep != NULL);
886
887         lockrep->lock_policy_res2 =
888                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
889
890         /* Retry infinitely when the server returns -EINPROGRESS for the
891          * intent operation, when server returns -EINPROGRESS for acquiring
892          * intent lock, we'll retry in after_reply(). */
893         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
894                 mdc_clear_replay_flag(req, rc);
895                 ptlrpc_req_finished(req);
896                 if (generation == obddev->u.cli.cl_import->imp_generation) {
897                         if (signal_pending(current))
898                                 RETURN(-EINTR);
899
900                         resends++;
901                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
902                                obddev->obd_name, resends, it->it_op,
903                                PFID(&op_data->op_fid1),
904                                PFID(&op_data->op_fid2));
905                         goto resend;
906                 } else {
907                         CDEBUG(D_HA, "resend cross eviction\n");
908                         RETURN(-EIO);
909                 }
910         }
911
912         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
913         if (rc < 0) {
914                 if (lustre_handle_is_used(lockh)) {
915                         ldlm_lock_decref(lockh, einfo->ei_mode);
916                         memset(lockh, 0, sizeof(*lockh));
917                 }
918                 ptlrpc_req_finished(req);
919
920                 it->it_lock_handle = 0;
921                 it->it_lock_mode = 0;
922                 it->it_request = NULL;
923         }
924
925         RETURN(rc);
926 }
927
928 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
929                 const union ldlm_policy_data *policy,
930                 struct md_op_data *op_data,
931                 struct lustre_handle *lockh, __u64 extra_lock_flags)
932 {
933         return mdc_enqueue_base(exp, einfo, policy, NULL,
934                                 op_data, lockh, extra_lock_flags);
935 }
936
937 static int mdc_finish_intent_lock(struct obd_export *exp,
938                                   struct ptlrpc_request *request,
939                                   struct md_op_data *op_data,
940                                   struct lookup_intent *it,
941                                   struct lustre_handle *lockh)
942 {
943         struct lustre_handle old_lock;
944         struct ldlm_lock *lock;
945         int rc = 0;
946         ENTRY;
947
948         LASSERT(request != NULL);
949         LASSERT(request != LP_POISON);
950         LASSERT(request->rq_repmsg != LP_POISON);
951
952         if (it->it_op & IT_READDIR)
953                 RETURN(0);
954
955         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
956                 if (it->it_status != 0)
957                         GOTO(out, rc = it->it_status);
958         } else {
959                 if (!it_disposition(it, DISP_IT_EXECD)) {
960                         /* The server failed before it even started executing
961                          * the intent, i.e. because it couldn't unpack the
962                          * request.
963                          */
964                         LASSERT(it->it_status != 0);
965                         GOTO(out, rc = it->it_status);
966                 }
967                 rc = it_open_error(DISP_IT_EXECD, it);
968                 if (rc)
969                         GOTO(out, rc);
970
971                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
972                 if (rc)
973                         GOTO(out, rc);
974
975                 /* keep requests around for the multiple phases of the call
976                  * this shows the DISP_XX must guarantee we make it into the
977                  * call
978                  */
979                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
980                     it_disposition(it, DISP_OPEN_CREATE) &&
981                     !it_open_error(DISP_OPEN_CREATE, it)) {
982                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
983                         /* balanced in ll_create_node */
984                         ptlrpc_request_addref(request);
985                 }
986                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
987                     it_disposition(it, DISP_OPEN_OPEN) &&
988                     !it_open_error(DISP_OPEN_OPEN, it)) {
989                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
990                         /* balanced in ll_file_open */
991                         ptlrpc_request_addref(request);
992                         /* BUG 11546 - eviction in the middle of open rpc
993                          * processing
994                          */
995                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
996                                          obd_timeout);
997                 }
998
999                 if (it->it_op & IT_CREAT) {
1000                         /* XXX this belongs in ll_create_it */
1001                 } else if (it->it_op == IT_OPEN) {
1002                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1003                 } else {
1004                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1005                 }
1006         }
1007
1008         /* If we already have a matching lock, then cancel the new
1009          * one.  We have to set the data here instead of in
1010          * mdc_enqueue, because we need to use the child's inode as
1011          * the l_ast_data to match, and that's not available until
1012          * intent_finish has performed the iget().) */
1013         lock = ldlm_handle2lock(lockh);
1014         if (lock) {
1015                 union ldlm_policy_data policy = lock->l_policy_data;
1016                 LDLM_DEBUG(lock, "matching against this");
1017
1018                 if (it_has_reply_body(it)) {
1019                         struct mdt_body *body;
1020
1021                         body = req_capsule_server_get(&request->rq_pill,
1022                                                       &RMF_MDT_BODY);
1023                         /* mdc_enqueue checked */
1024                         LASSERT(body != NULL);
1025                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1026                                                  &lock->l_resource->lr_name),
1027                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1028                                  PLDLMRES(lock->l_resource),
1029                                  PFID(&body->mbo_fid1));
1030                 }
1031                 LDLM_LOCK_PUT(lock);
1032
1033                 memcpy(&old_lock, lockh, sizeof(*lockh));
1034                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1035                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1036                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1037                         memcpy(lockh, &old_lock, sizeof(old_lock));
1038                         it->it_lock_handle = lockh->cookie;
1039                 }
1040         }
1041
1042         EXIT;
1043 out:
1044         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1045                 (int)op_data->op_namelen, op_data->op_name,
1046                 ldlm_it2str(it->it_op), it->it_status,
1047                 it->it_disposition, rc);
1048         return rc;
1049 }
1050
1051 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1052                         struct lu_fid *fid, __u64 *bits)
1053 {
1054         /* We could just return 1 immediately, but since we should only
1055          * be called in revalidate_it if we already have a lock, let's
1056          * verify that. */
1057         struct ldlm_res_id res_id;
1058         struct lustre_handle lockh;
1059         union ldlm_policy_data policy;
1060         enum ldlm_mode mode;
1061         ENTRY;
1062
1063         if (it->it_lock_handle) {
1064                 lockh.cookie = it->it_lock_handle;
1065                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1066         } else {
1067                 fid_build_reg_res_name(fid, &res_id);
1068                 switch (it->it_op) {
1069                 case IT_GETATTR:
1070                         /* File attributes are held under multiple bits:
1071                          * nlink is under lookup lock, size and times are
1072                          * under UPDATE lock and recently we've also got
1073                          * a separate permissions lock for owner/group/acl that
1074                          * were protected by lookup lock before.
1075                          * Getattr must provide all of that information,
1076                          * so we need to ensure we have all of those locks.
1077                          * Unfortunately, if the bits are split across multiple
1078                          * locks, there's no easy way to match all of them here,
1079                          * so an extra RPC would be performed to fetch all
1080                          * of those bits at once for now. */
1081                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1082                          * but for old MDTs (< 2.4), permission is covered
1083                          * by LOOKUP lock, so it needs to match all bits here.*/
1084                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1085                                                   MDS_INODELOCK_LOOKUP |
1086                                                   MDS_INODELOCK_PERM;
1087                         break;
1088                 case IT_READDIR:
1089                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1090                         break;
1091                 case IT_LAYOUT:
1092                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1093                         break;
1094                 default:
1095                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1096                         break;
1097                 }
1098
1099                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1100                                       LDLM_IBITS, &policy,
1101                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1102                                       &lockh);
1103         }
1104
1105         if (mode) {
1106                 it->it_lock_handle = lockh.cookie;
1107                 it->it_lock_mode = mode;
1108         } else {
1109                 it->it_lock_handle = 0;
1110                 it->it_lock_mode = 0;
1111         }
1112
1113         RETURN(!!mode);
1114 }
1115
1116 /*
1117  * This long block is all about fixing up the lock and request state
1118  * so that it is correct as of the moment _before_ the operation was
1119  * applied; that way, the VFS will think that everything is normal and
1120  * call Lustre's regular VFS methods.
1121  *
1122  * If we're performing a creation, that means that unless the creation
1123  * failed with EEXIST, we should fake up a negative dentry.
1124  *
1125  * For everything else, we want to lookup to succeed.
1126  *
1127  * One additional note: if CREATE or OPEN succeeded, we add an extra
1128  * reference to the request because we need to keep it around until
1129  * ll_create/ll_open gets called.
1130  *
1131  * The server will return to us, in it_disposition, an indication of
1132  * exactly what it_status refers to.
1133  *
1134  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1135  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1136  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1137  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1138  * was successful.
1139  *
1140  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1141  * child lookup.
1142  */
1143 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1144                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1145                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1146 {
1147         struct ldlm_enqueue_info einfo = {
1148                 .ei_type        = LDLM_IBITS,
1149                 .ei_mode        = it_to_lock_mode(it),
1150                 .ei_cb_bl       = cb_blocking,
1151                 .ei_cb_cp       = ldlm_completion_ast,
1152                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1153         };
1154         struct lustre_handle lockh;
1155         int rc = 0;
1156         ENTRY;
1157         LASSERT(it);
1158
1159         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1160                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1161                 op_data->op_name, PFID(&op_data->op_fid2),
1162                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1163                 it->it_flags);
1164
1165         lockh.cookie = 0;
1166         if (fid_is_sane(&op_data->op_fid2) &&
1167             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1168                 /* We could just return 1 immediately, but since we should only
1169                  * be called in revalidate_it if we already have a lock, let's
1170                  * verify that. */
1171                 it->it_lock_handle = 0;
1172                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1173                 /* Only return failure if it was not GETATTR by cfid
1174                    (from inode_revalidate) */
1175                 if (rc || op_data->op_namelen != 0)
1176                         RETURN(rc);
1177         }
1178
1179         /* For case if upper layer did not alloc fid, do it now. */
1180         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1181                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1182                 if (rc < 0) {
1183                         CERROR("Can't alloc new fid, rc %d\n", rc);
1184                         RETURN(rc);
1185                 }
1186         }
1187
1188         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1189                               extra_lock_flags);
1190         if (rc < 0)
1191                 RETURN(rc);
1192
1193         *reqp = it->it_request;
1194         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1195         RETURN(rc);
1196 }
1197
1198 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1199                                               struct ptlrpc_request *req,
1200                                               void *args, int rc)
1201 {
1202         struct mdc_getattr_args  *ga = args;
1203         struct obd_export        *exp = ga->ga_exp;
1204         struct md_enqueue_info   *minfo = ga->ga_minfo;
1205         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1206         struct lookup_intent     *it;
1207         struct lustre_handle     *lockh;
1208         struct obd_device        *obddev;
1209         struct ldlm_reply        *lockrep;
1210         __u64                     flags = LDLM_FL_HAS_INTENT;
1211         ENTRY;
1212
1213         it    = &minfo->mi_it;
1214         lockh = &minfo->mi_lockh;
1215
1216         obddev = class_exp2obd(exp);
1217
1218         obd_put_request_slot(&obddev->u.cli);
1219         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1220                 rc = -ETIMEDOUT;
1221
1222         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1223                                    &flags, NULL, 0, lockh, rc);
1224         if (rc < 0) {
1225                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1226                 mdc_clear_replay_flag(req, rc);
1227                 GOTO(out, rc);
1228         }
1229
1230         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1231         LASSERT(lockrep != NULL);
1232
1233         lockrep->lock_policy_res2 =
1234                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1235
1236         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1237         if (rc)
1238                 GOTO(out, rc);
1239
1240         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1241         EXIT;
1242
1243 out:
1244         minfo->mi_cb(req, minfo, rc);
1245         return 0;
1246 }
1247
1248 int mdc_intent_getattr_async(struct obd_export *exp,
1249                              struct md_enqueue_info *minfo)
1250 {
1251         struct md_op_data       *op_data = &minfo->mi_data;
1252         struct lookup_intent    *it = &minfo->mi_it;
1253         struct ptlrpc_request   *req;
1254         struct mdc_getattr_args *ga;
1255         struct obd_device       *obddev = class_exp2obd(exp);
1256         struct ldlm_res_id       res_id;
1257         union ldlm_policy_data policy = {
1258                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1259                                                  MDS_INODELOCK_UPDATE } };
1260         int                      rc = 0;
1261         __u64                    flags = LDLM_FL_HAS_INTENT;
1262         ENTRY;
1263
1264         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1265                 (int)op_data->op_namelen, op_data->op_name,
1266                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1267
1268         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1269         req = mdc_intent_getattr_pack(exp, it, op_data);
1270         if (IS_ERR(req))
1271                 RETURN(PTR_ERR(req));
1272
1273         rc = obd_get_request_slot(&obddev->u.cli);
1274         if (rc != 0) {
1275                 ptlrpc_req_finished(req);
1276                 RETURN(rc);
1277         }
1278
1279         /* With Data-on-MDT the glimpse callback is needed too.
1280          * It is set here in advance but not in mdc_finish_enqueue()
1281          * to avoid possible races. It is safe to have glimpse handler
1282          * for non-DOM locks and costs nothing.*/
1283         if (minfo->mi_einfo.ei_cb_gl == NULL)
1284                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1285
1286         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1287                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1288         if (rc < 0) {
1289                 obd_put_request_slot(&obddev->u.cli);
1290                 ptlrpc_req_finished(req);
1291                 RETURN(rc);
1292         }
1293
1294         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1295         ga = ptlrpc_req_async_args(req);
1296         ga->ga_exp = exp;
1297         ga->ga_minfo = minfo;
1298
1299         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1300         ptlrpcd_add_req(req);
1301
1302         RETURN(0);
1303 }