Whamcloud - gitweb
dad1954ca17d2cec2bca57dc04894a0ee50267e7
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN  250
341 #define GA_DEFAULT_EA_NUM      10
342
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345                          struct lookup_intent *it,
346                          struct md_op_data *op_data)
347 {
348         struct ptlrpc_request   *req;
349         struct ldlm_intent      *lit;
350         int                     rc, count = 0;
351         struct list_head        cancels = LIST_HEAD_INIT(cancels);
352
353         ENTRY;
354
355         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356                                         &RQF_LDLM_INTENT_GETXATTR);
357         if (req == NULL)
358                 RETURN(ERR_PTR(-ENOMEM));
359
360         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
361         if (rc) {
362                 ptlrpc_request_free(req);
363                 RETURN(ERR_PTR(rc));
364         }
365
366         /* pack the intent */
367         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368         lit->opc = IT_GETXATTR;
369
370         /* pack the intended request */
371         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
373                       -1, 0);
374
375         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
377
378         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
385
386         ptlrpc_request_set_replen(req);
387
388         RETURN(req);
389 }
390
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392                                                      struct lookup_intent *it,
393                                                      struct md_op_data *op_data)
394 {
395         struct ptlrpc_request *req;
396         struct obd_device     *obddev = class_exp2obd(exp);
397         struct ldlm_intent    *lit;
398         int                    rc;
399         ENTRY;
400
401         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402                                    &RQF_LDLM_INTENT_UNLINK);
403         if (req == NULL)
404                 RETURN(ERR_PTR(-ENOMEM));
405
406         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407                              op_data->op_namelen + 1);
408
409         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
410         if (rc) {
411                 ptlrpc_request_free(req);
412                 RETURN(ERR_PTR(rc));
413         }
414
415         /* pack the intent */
416         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417         lit->opc = (__u64)it->it_op;
418
419         /* pack the intended request */
420         mdc_unlink_pack(req, op_data);
421
422         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423                              obddev->u.cli.cl_default_mds_easize);
424         ptlrpc_request_set_replen(req);
425         RETURN(req);
426 }
427
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429                                                       struct lookup_intent *it,
430                                                       struct md_op_data *op_data)
431 {
432         struct ptlrpc_request   *req;
433         struct obd_device       *obddev = class_exp2obd(exp);
434         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436                                          OBD_MD_MEA | OBD_MD_FLACL;
437         struct ldlm_intent      *lit;
438         int                      rc;
439         __u32                    easize;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443                                    &RQF_LDLM_INTENT_GETATTR);
444         if (req == NULL)
445                 RETURN(ERR_PTR(-ENOMEM));
446
447         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448                              op_data->op_namelen + 1);
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         /* pack the intent */
457         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458         lit->opc = (__u64)it->it_op;
459
460         if (obddev->u.cli.cl_default_mds_easize > 0)
461                 easize = obddev->u.cli.cl_default_mds_easize;
462         else
463                 easize = obddev->u.cli.cl_max_mds_easize;
464
465         /* pack the intended request */
466         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
467
468         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470                              req->rq_import->imp_connect_data.ocd_max_easize);
471         ptlrpc_request_set_replen(req);
472         RETURN(req);
473 }
474
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476                                                      struct lookup_intent *it,
477                                                      struct md_op_data *op_data)
478 {
479         struct obd_device     *obd = class_exp2obd(exp);
480         struct ptlrpc_request *req;
481         struct ldlm_intent    *lit;
482         struct layout_intent  *layout;
483         int rc;
484         ENTRY;
485
486         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487                                 &RQF_LDLM_INTENT_LAYOUT);
488         if (req == NULL)
489                 RETURN(ERR_PTR(-ENOMEM));
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
493         if (rc) {
494                 ptlrpc_request_free(req);
495                 RETURN(ERR_PTR(rc));
496         }
497
498         /* pack the intent */
499         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500         lit->opc = (__u64)it->it_op;
501
502         /* pack the layout intent request */
503         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504         LASSERT(op_data->op_data != NULL);
505         LASSERT(op_data->op_data_size == sizeof(*layout));
506         memcpy(layout, op_data->op_data, sizeof(*layout));
507
508         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509                              obd->u.cli.cl_default_mds_easize);
510         ptlrpc_request_set_replen(req);
511         RETURN(req);
512 }
513
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
516 {
517         struct ptlrpc_request *req;
518         int rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
522         if (req == NULL)
523                 RETURN(ERR_PTR(-ENOMEM));
524
525         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
526         if (rc) {
527                 ptlrpc_request_free(req);
528                 RETURN(ERR_PTR(rc));
529         }
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532         ptlrpc_request_set_replen(req);
533         RETURN(req);
534 }
535
536 static int mdc_finish_enqueue(struct obd_export *exp,
537                               struct ptlrpc_request *req,
538                               struct ldlm_enqueue_info *einfo,
539                               struct lookup_intent *it,
540                               struct lustre_handle *lockh,
541                               int rc)
542 {
543         struct req_capsule  *pill = &req->rq_pill;
544         struct ldlm_request *lockreq;
545         struct ldlm_reply   *lockrep;
546         struct ldlm_lock    *lock;
547         void                *lvb_data = NULL;
548         __u32                lvb_len = 0;
549         ENTRY;
550
551         LASSERT(rc >= 0);
552         /* Similarly, if we're going to replay this request, we don't want to
553          * actually get a lock, just perform the intent. */
554         if (req->rq_transno || req->rq_replay) {
555                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
556                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
557         }
558
559         if (rc == ELDLM_LOCK_ABORTED) {
560                 einfo->ei_mode = 0;
561                 memset(lockh, 0, sizeof(*lockh));
562                 rc = 0;
563         } else { /* rc = 0 */
564                 lock = ldlm_handle2lock(lockh);
565                 LASSERT(lock != NULL);
566
567                 /* If the server gave us back a different lock mode, we should
568                  * fix up our variables. */
569                 if (lock->l_req_mode != einfo->ei_mode) {
570                         ldlm_lock_addref(lockh, lock->l_req_mode);
571                         ldlm_lock_decref(lockh, einfo->ei_mode);
572                         einfo->ei_mode = lock->l_req_mode;
573                 }
574                 LDLM_LOCK_PUT(lock);
575         }
576
577         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
578         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
579
580         it->it_disposition = (int)lockrep->lock_policy_res1;
581         it->it_status = (int)lockrep->lock_policy_res2;
582         it->it_lock_mode = einfo->ei_mode;
583         it->it_lock_handle = lockh->cookie;
584         it->it_request = req;
585
586         /* Technically speaking rq_transno must already be zero if
587          * it_status is in error, so the check is a bit redundant */
588         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
589                 mdc_clear_replay_flag(req, it->it_status);
590
591         /* If we're doing an IT_OPEN which did not result in an actual
592          * successful open, then we need to remove the bit which saves
593          * this request for unconditional replay.
594          *
595          * It's important that we do this first!  Otherwise we might exit the
596          * function without doing so, and try to replay a failed create
597          * (bug 3440) */
598         if (it->it_op & IT_OPEN && req->rq_replay &&
599             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
600                 mdc_clear_replay_flag(req, it->it_status);
601
602         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
603                   it->it_op, it->it_disposition, it->it_status);
604
605         /* We know what to expect, so we do any byte flipping required here */
606         if (it_has_reply_body(it)) {
607                 struct mdt_body *body;
608
609                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
610                 if (body == NULL) {
611                         CERROR ("Can't swab mdt_body\n");
612                         RETURN (-EPROTO);
613                 }
614
615                 if (it_disposition(it, DISP_OPEN_OPEN) &&
616                     !it_open_error(DISP_OPEN_OPEN, it)) {
617                         /*
618                          * If this is a successful OPEN request, we need to set
619                          * replay handler and data early, so that if replay
620                          * happens immediately after swabbing below, new reply
621                          * is swabbed by that handler correctly.
622                          */
623                         mdc_set_open_replay_data(NULL, NULL, it);
624                 }
625
626                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
627                         void *eadata;
628
629                         mdc_update_max_ea_from_body(exp, body);
630
631                         /*
632                          * The eadata is opaque; just check that it is there.
633                          * Eventually, obd_unpackmd() will check the contents.
634                          */
635                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636                                                         body->mbo_eadatasize);
637                         if (eadata == NULL)
638                                 RETURN(-EPROTO);
639
640                         /* save lvb data and length in case this is for layout
641                          * lock */
642                         lvb_data = eadata;
643                         lvb_len = body->mbo_eadatasize;
644
645                         /*
646                          * We save the reply LOV EA in case we have to replay a
647                          * create for recovery.  If we didn't allocate a large
648                          * enough request buffer above we need to reallocate it
649                          * here to hold the actual LOV EA.
650                          *
651                          * To not save LOV EA if request is not going to replay
652                          * (for example error one).
653                          */
654                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
655                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656                                                     body->mbo_eadatasize);
657                                 if (rc) {
658                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
659                                         body->mbo_eadatasize = 0;
660                                         rc = 0;
661                                 }
662                         }
663                 }
664         } else if (it->it_op & IT_LAYOUT) {
665                 /* maybe the lock was granted right away and layout
666                  * is packed into RMF_DLM_LVB of req */
667                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
668                 if (lvb_len > 0) {
669                         lvb_data = req_capsule_server_sized_get(pill,
670                                                         &RMF_DLM_LVB, lvb_len);
671                         if (lvb_data == NULL)
672                                 RETURN(-EPROTO);
673
674                         /**
675                          * save replied layout data to the request buffer for
676                          * recovery consideration (lest MDS reinitialize
677                          * another set of OST objects).
678                          */
679                         if (req->rq_transno)
680                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
681                                                      lvb_len);
682                 }
683         }
684
685         /* fill in stripe data for layout lock.
686          * LU-6581: trust layout data only if layout lock is granted. The MDT
687          * has stopped sending layout unless the layout lock is granted. The
688          * client still does this checking in case it's talking with an old
689          * server. - Jinshan */
690         lock = ldlm_handle2lock(lockh);
691         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
692             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
693                 void *lmm;
694
695                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
696                         ldlm_it2str(it->it_op), lvb_len);
697
698                 OBD_ALLOC_LARGE(lmm, lvb_len);
699                 if (lmm == NULL) {
700                         LDLM_LOCK_PUT(lock);
701                         RETURN(-ENOMEM);
702                 }
703                 memcpy(lmm, lvb_data, lvb_len);
704
705                 /* install lvb_data */
706                 lock_res_and_lock(lock);
707                 if (lock->l_lvb_data == NULL) {
708                         lock->l_lvb_type = LVB_T_LAYOUT;
709                         lock->l_lvb_data = lmm;
710                         lock->l_lvb_len = lvb_len;
711                         lmm = NULL;
712                 }
713                 unlock_res_and_lock(lock);
714                 if (lmm != NULL)
715                         OBD_FREE_LARGE(lmm, lvb_len);
716         }
717         if (lock != NULL)
718                 LDLM_LOCK_PUT(lock);
719
720         RETURN(rc);
721 }
722
723 /* We always reserve enough space in the reply packet for a stripe MD, because
724  * we don't know in advance the file type. */
725 static int mdc_enqueue_base(struct obd_export *exp,
726                             struct ldlm_enqueue_info *einfo,
727                             const union ldlm_policy_data *policy,
728                             struct lookup_intent *it,
729                             struct md_op_data *op_data,
730                             struct lustre_handle *lockh,
731                             __u64 extra_lock_flags)
732 {
733         struct obd_device *obddev = class_exp2obd(exp);
734         struct ptlrpc_request *req = NULL;
735         __u64 flags, saved_flags = extra_lock_flags;
736         struct ldlm_res_id res_id;
737         static const union ldlm_policy_data lookup_policy = {
738                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const union ldlm_policy_data update_policy = {
740                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const union ldlm_policy_data layout_policy = {
742                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         static const union ldlm_policy_data getxattr_policy = {
744                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
745         int generation, resends = 0;
746         struct ldlm_reply *lockrep;
747         enum lvb_type lvb_type = 0;
748         int rc;
749         ENTRY;
750
751         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
752                  einfo->ei_type);
753         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
754
755         if (it != NULL) {
756                 LASSERT(policy == NULL);
757
758                 saved_flags |= LDLM_FL_HAS_INTENT;
759                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
760                         policy = &update_policy;
761                 else if (it->it_op & IT_LAYOUT)
762                         policy = &layout_policy;
763                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
764                         policy = &getxattr_policy;
765                 else
766                         policy = &lookup_policy;
767         }
768
769         generation = obddev->u.cli.cl_import->imp_generation;
770 resend:
771         flags = saved_flags;
772         if (it == NULL) {
773                 /* The only way right now is FLOCK. */
774                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
775                          einfo->ei_type);
776                 res_id.name[3] = LDLM_FLOCK;
777         } else if (it->it_op & IT_OPEN) {
778                 req = mdc_intent_open_pack(exp, it, op_data);
779         } else if (it->it_op & IT_UNLINK) {
780                 req = mdc_intent_unlink_pack(exp, it, op_data);
781         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
782                 req = mdc_intent_getattr_pack(exp, it, op_data);
783         } else if (it->it_op & IT_READDIR) {
784                 req = mdc_enqueue_pack(exp, 0);
785         } else if (it->it_op & IT_LAYOUT) {
786                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
787                         RETURN(-EOPNOTSUPP);
788                 req = mdc_intent_layout_pack(exp, it, op_data);
789                 lvb_type = LVB_T_LAYOUT;
790         } else if (it->it_op & IT_GETXATTR) {
791                 req = mdc_intent_getxattr_pack(exp, it, op_data);
792         } else {
793                 LBUG();
794                 RETURN(-EINVAL);
795         }
796
797         if (IS_ERR(req))
798                 RETURN(PTR_ERR(req));
799
800         if (resends) {
801                 req->rq_generation_set = 1;
802                 req->rq_import_generation = generation;
803                 req->rq_sent = ktime_get_real_seconds() + resends;
804         }
805
806         /* It is important to obtain modify RPC slot first (if applicable), so
807          * that threads that are waiting for a modify RPC slot are not polluting
808          * our rpcs in flight counter.
809          * We do not do flock request limiting, though */
810         if (it) {
811                 mdc_get_mod_rpc_slot(req, it);
812                 rc = obd_get_request_slot(&obddev->u.cli);
813                 if (rc != 0) {
814                         mdc_put_mod_rpc_slot(req, it);
815                         mdc_clear_replay_flag(req, 0);
816                         ptlrpc_req_finished(req);
817                         RETURN(rc);
818                 }
819         }
820
821         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
822                               0, lvb_type, lockh, 0);
823         if (!it) {
824                 /* For flock requests we immediatelly return without further
825                    delay and let caller deal with the rest, since rest of
826                    this function metadata processing makes no sense for flock
827                    requests anyway. But in case of problem during comms with
828                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
829                    can not rely on caller and this mainly for F_UNLCKs
830                    (explicits or automatically generated by Kernel to clean
831                    current FLocks upon exit) that can't be trashed */
832                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
833                     (einfo->ei_type == LDLM_FLOCK) &&
834                     (einfo->ei_mode == LCK_NL))
835                         goto resend;
836                 RETURN(rc);
837         }
838
839         obd_put_request_slot(&obddev->u.cli);
840         mdc_put_mod_rpc_slot(req, it);
841
842         if (rc < 0) {
843                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
844                        obddev->obd_name, rc);
845
846                 mdc_clear_replay_flag(req, rc);
847                 ptlrpc_req_finished(req);
848                 RETURN(rc);
849         }
850
851         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
852         LASSERT(lockrep != NULL);
853
854         lockrep->lock_policy_res2 =
855                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
856
857         /* Retry infinitely when the server returns -EINPROGRESS for the
858          * intent operation, when server returns -EINPROGRESS for acquiring
859          * intent lock, we'll retry in after_reply(). */
860         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
861                 mdc_clear_replay_flag(req, rc);
862                 ptlrpc_req_finished(req);
863                 resends++;
864
865                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
866                        obddev->obd_name, resends, it->it_op,
867                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
868
869                 if (generation == obddev->u.cli.cl_import->imp_generation) {
870                         goto resend;
871                 } else {
872                         CDEBUG(D_HA, "resend cross eviction\n");
873                         RETURN(-EIO);
874                 }
875         }
876
877         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
878         if (rc < 0) {
879                 if (lustre_handle_is_used(lockh)) {
880                         ldlm_lock_decref(lockh, einfo->ei_mode);
881                         memset(lockh, 0, sizeof(*lockh));
882                 }
883                 ptlrpc_req_finished(req);
884
885                 it->it_lock_handle = 0;
886                 it->it_lock_mode = 0;
887                 it->it_request = NULL;
888         }
889
890         RETURN(rc);
891 }
892
893 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
894                 const union ldlm_policy_data *policy,
895                 struct md_op_data *op_data,
896                 struct lustre_handle *lockh, __u64 extra_lock_flags)
897 {
898         return mdc_enqueue_base(exp, einfo, policy, NULL,
899                                 op_data, lockh, extra_lock_flags);
900 }
901
902 static int mdc_finish_intent_lock(struct obd_export *exp,
903                                   struct ptlrpc_request *request,
904                                   struct md_op_data *op_data,
905                                   struct lookup_intent *it,
906                                   struct lustre_handle *lockh)
907 {
908         struct lustre_handle old_lock;
909         struct ldlm_lock *lock;
910         int rc = 0;
911         ENTRY;
912
913         LASSERT(request != NULL);
914         LASSERT(request != LP_POISON);
915         LASSERT(request->rq_repmsg != LP_POISON);
916
917         if (it->it_op & IT_READDIR)
918                 RETURN(0);
919
920         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
921                 if (it->it_status != 0)
922                         GOTO(out, rc = it->it_status);
923         } else {
924                 if (!it_disposition(it, DISP_IT_EXECD)) {
925                         /* The server failed before it even started executing
926                          * the intent, i.e. because it couldn't unpack the
927                          * request.
928                          */
929                         LASSERT(it->it_status != 0);
930                         GOTO(out, rc = it->it_status);
931                 }
932                 rc = it_open_error(DISP_IT_EXECD, it);
933                 if (rc)
934                         GOTO(out, rc);
935
936                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
937                 if (rc)
938                         GOTO(out, rc);
939
940                 /* keep requests around for the multiple phases of the call
941                  * this shows the DISP_XX must guarantee we make it into the
942                  * call
943                  */
944                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
945                     it_disposition(it, DISP_OPEN_CREATE) &&
946                     !it_open_error(DISP_OPEN_CREATE, it)) {
947                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
948                         /* balanced in ll_create_node */
949                         ptlrpc_request_addref(request);
950                 }
951                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
952                     it_disposition(it, DISP_OPEN_OPEN) &&
953                     !it_open_error(DISP_OPEN_OPEN, it)) {
954                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
955                         /* balanced in ll_file_open */
956                         ptlrpc_request_addref(request);
957                         /* BUG 11546 - eviction in the middle of open rpc
958                          * processing
959                          */
960                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
961                                          obd_timeout);
962                 }
963
964                 if (it->it_op & IT_CREAT) {
965                         /* XXX this belongs in ll_create_it */
966                 } else if (it->it_op == IT_OPEN) {
967                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
968                 } else {
969                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
970                 }
971         }
972
973         /* If we already have a matching lock, then cancel the new
974          * one.  We have to set the data here instead of in
975          * mdc_enqueue, because we need to use the child's inode as
976          * the l_ast_data to match, and that's not available until
977          * intent_finish has performed the iget().) */
978         lock = ldlm_handle2lock(lockh);
979         if (lock) {
980                 union ldlm_policy_data policy = lock->l_policy_data;
981                 LDLM_DEBUG(lock, "matching against this");
982
983                 if (it_has_reply_body(it)) {
984                         struct mdt_body *body;
985
986                         body = req_capsule_server_get(&request->rq_pill,
987                                                       &RMF_MDT_BODY);
988                         /* mdc_enqueue checked */
989                         LASSERT(body != NULL);
990                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
991                                                  &lock->l_resource->lr_name),
992                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
993                                  PLDLMRES(lock->l_resource),
994                                  PFID(&body->mbo_fid1));
995                 }
996                 LDLM_LOCK_PUT(lock);
997
998                 memcpy(&old_lock, lockh, sizeof(*lockh));
999                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1000                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1001                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1002                         memcpy(lockh, &old_lock, sizeof(old_lock));
1003                         it->it_lock_handle = lockh->cookie;
1004                 }
1005         }
1006
1007         EXIT;
1008 out:
1009         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1010                 (int)op_data->op_namelen, op_data->op_name,
1011                 ldlm_it2str(it->it_op), it->it_status,
1012                 it->it_disposition, rc);
1013         return rc;
1014 }
1015
1016 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1017                         struct lu_fid *fid, __u64 *bits)
1018 {
1019         /* We could just return 1 immediately, but since we should only
1020          * be called in revalidate_it if we already have a lock, let's
1021          * verify that. */
1022         struct ldlm_res_id res_id;
1023         struct lustre_handle lockh;
1024         union ldlm_policy_data policy;
1025         enum ldlm_mode mode;
1026         ENTRY;
1027
1028         if (it->it_lock_handle) {
1029                 lockh.cookie = it->it_lock_handle;
1030                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1031         } else {
1032                 fid_build_reg_res_name(fid, &res_id);
1033                 switch (it->it_op) {
1034                 case IT_GETATTR:
1035                         /* File attributes are held under multiple bits:
1036                          * nlink is under lookup lock, size and times are
1037                          * under UPDATE lock and recently we've also got
1038                          * a separate permissions lock for owner/group/acl that
1039                          * were protected by lookup lock before.
1040                          * Getattr must provide all of that information,
1041                          * so we need to ensure we have all of those locks.
1042                          * Unfortunately, if the bits are split across multiple
1043                          * locks, there's no easy way to match all of them here,
1044                          * so an extra RPC would be performed to fetch all
1045                          * of those bits at once for now. */
1046                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1047                          * but for old MDTs (< 2.4), permission is covered
1048                          * by LOOKUP lock, so it needs to match all bits here.*/
1049                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1050                                                   MDS_INODELOCK_LOOKUP |
1051                                                   MDS_INODELOCK_PERM;
1052                         break;
1053                 case IT_READDIR:
1054                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1055                         break;
1056                 case IT_LAYOUT:
1057                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1058                         break;
1059                 default:
1060                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1061                         break;
1062                 }
1063
1064                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1065                                       LDLM_IBITS, &policy,
1066                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1067                                       &lockh);
1068         }
1069
1070         if (mode) {
1071                 it->it_lock_handle = lockh.cookie;
1072                 it->it_lock_mode = mode;
1073         } else {
1074                 it->it_lock_handle = 0;
1075                 it->it_lock_mode = 0;
1076         }
1077
1078         RETURN(!!mode);
1079 }
1080
1081 /*
1082  * This long block is all about fixing up the lock and request state
1083  * so that it is correct as of the moment _before_ the operation was
1084  * applied; that way, the VFS will think that everything is normal and
1085  * call Lustre's regular VFS methods.
1086  *
1087  * If we're performing a creation, that means that unless the creation
1088  * failed with EEXIST, we should fake up a negative dentry.
1089  *
1090  * For everything else, we want to lookup to succeed.
1091  *
1092  * One additional note: if CREATE or OPEN succeeded, we add an extra
1093  * reference to the request because we need to keep it around until
1094  * ll_create/ll_open gets called.
1095  *
1096  * The server will return to us, in it_disposition, an indication of
1097  * exactly what it_status refers to.
1098  *
1099  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1100  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1101  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1102  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1103  * was successful.
1104  *
1105  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1106  * child lookup.
1107  */
1108 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1109                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1110                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1111 {
1112         struct ldlm_enqueue_info einfo = {
1113                 .ei_type        = LDLM_IBITS,
1114                 .ei_mode        = it_to_lock_mode(it),
1115                 .ei_cb_bl       = cb_blocking,
1116                 .ei_cb_cp       = ldlm_completion_ast,
1117         };
1118         struct lustre_handle lockh;
1119         int rc = 0;
1120         ENTRY;
1121         LASSERT(it);
1122
1123         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1124                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1125                 op_data->op_name, PFID(&op_data->op_fid2),
1126                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1127                 it->it_flags);
1128
1129         lockh.cookie = 0;
1130         if (fid_is_sane(&op_data->op_fid2) &&
1131             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1132                 /* We could just return 1 immediately, but since we should only
1133                  * be called in revalidate_it if we already have a lock, let's
1134                  * verify that. */
1135                 it->it_lock_handle = 0;
1136                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1137                 /* Only return failure if it was not GETATTR by cfid
1138                    (from inode_revalidate) */
1139                 if (rc || op_data->op_namelen != 0)
1140                         RETURN(rc);
1141         }
1142
1143         /* For case if upper layer did not alloc fid, do it now. */
1144         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1145                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1146                 if (rc < 0) {
1147                         CERROR("Can't alloc new fid, rc %d\n", rc);
1148                         RETURN(rc);
1149                 }
1150         }
1151
1152         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1153                               extra_lock_flags);
1154         if (rc < 0)
1155                 RETURN(rc);
1156
1157         *reqp = it->it_request;
1158         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1159         RETURN(rc);
1160 }
1161
1162 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1163                                               struct ptlrpc_request *req,
1164                                               void *args, int rc)
1165 {
1166         struct mdc_getattr_args  *ga = args;
1167         struct obd_export        *exp = ga->ga_exp;
1168         struct md_enqueue_info   *minfo = ga->ga_minfo;
1169         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1170         struct lookup_intent     *it;
1171         struct lustre_handle     *lockh;
1172         struct obd_device        *obddev;
1173         struct ldlm_reply        *lockrep;
1174         __u64                     flags = LDLM_FL_HAS_INTENT;
1175         ENTRY;
1176
1177         it    = &minfo->mi_it;
1178         lockh = &minfo->mi_lockh;
1179
1180         obddev = class_exp2obd(exp);
1181
1182         obd_put_request_slot(&obddev->u.cli);
1183         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1184                 rc = -ETIMEDOUT;
1185
1186         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1187                                    &flags, NULL, 0, lockh, rc);
1188         if (rc < 0) {
1189                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1190                 mdc_clear_replay_flag(req, rc);
1191                 GOTO(out, rc);
1192         }
1193
1194         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1195         LASSERT(lockrep != NULL);
1196
1197         lockrep->lock_policy_res2 =
1198                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1199
1200         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1201         if (rc)
1202                 GOTO(out, rc);
1203
1204         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1205         EXIT;
1206
1207 out:
1208         minfo->mi_cb(req, minfo, rc);
1209         return 0;
1210 }
1211
1212 int mdc_intent_getattr_async(struct obd_export *exp,
1213                              struct md_enqueue_info *minfo)
1214 {
1215         struct md_op_data       *op_data = &minfo->mi_data;
1216         struct lookup_intent    *it = &minfo->mi_it;
1217         struct ptlrpc_request   *req;
1218         struct mdc_getattr_args *ga;
1219         struct obd_device       *obddev = class_exp2obd(exp);
1220         struct ldlm_res_id       res_id;
1221         union ldlm_policy_data policy = {
1222                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1223                                                  MDS_INODELOCK_UPDATE } };
1224         int                      rc = 0;
1225         __u64                    flags = LDLM_FL_HAS_INTENT;
1226         ENTRY;
1227
1228         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1229                 (int)op_data->op_namelen, op_data->op_name,
1230                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1231
1232         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1233         req = mdc_intent_getattr_pack(exp, it, op_data);
1234         if (IS_ERR(req))
1235                 RETURN(PTR_ERR(req));
1236
1237         rc = obd_get_request_slot(&obddev->u.cli);
1238         if (rc != 0) {
1239                 ptlrpc_req_finished(req);
1240                 RETURN(rc);
1241         }
1242
1243         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1244                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1245         if (rc < 0) {
1246                 obd_put_request_slot(&obddev->u.cli);
1247                 ptlrpc_req_finished(req);
1248                 RETURN(rc);
1249         }
1250
1251         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1252         ga = ptlrpc_req_async_args(req);
1253         ga->ga_exp = exp;
1254         ga->ga_minfo = minfo;
1255
1256         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1257         ptlrpcd_add_req(req);
1258
1259         RETURN(0);
1260 }