Whamcloud - gitweb
a85324d8f4b4296aa8d5f4e5f214451276d7f6bf
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 static struct ptlrpc_request *
340 mdc_intent_getxattr_pack(struct obd_export *exp,
341                          struct lookup_intent *it,
342                          struct md_op_data *op_data)
343 {
344         struct ptlrpc_request   *req;
345         struct ldlm_intent      *lit;
346         int                     rc, count = 0;
347         __u32                   maxdata;
348         struct list_head        cancels = LIST_HEAD_INIT(cancels);
349
350         ENTRY;
351
352         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
353                                         &RQF_LDLM_INTENT_GETXATTR);
354         if (req == NULL)
355                 RETURN(ERR_PTR(-ENOMEM));
356
357         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
358         if (rc) {
359                 ptlrpc_request_free(req);
360                 RETURN(ERR_PTR(rc));
361         }
362
363         /* pack the intent */
364         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
365         lit->opc = IT_GETXATTR;
366
367         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
368
369         /* pack the intended request */
370         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
371                       0);
372
373         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
374                                 RCL_SERVER, maxdata);
375
376         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
377                                 RCL_SERVER, maxdata);
378
379         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
380                                 RCL_SERVER, maxdata);
381
382         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, maxdata);
383
384         ptlrpc_request_set_replen(req);
385
386         RETURN(req);
387 }
388
389 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
390                                                      struct lookup_intent *it,
391                                                      struct md_op_data *op_data)
392 {
393         struct ptlrpc_request *req;
394         struct obd_device     *obddev = class_exp2obd(exp);
395         struct ldlm_intent    *lit;
396         int                    rc;
397         ENTRY;
398
399         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
400                                    &RQF_LDLM_INTENT_UNLINK);
401         if (req == NULL)
402                 RETURN(ERR_PTR(-ENOMEM));
403
404         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
405                              op_data->op_namelen + 1);
406
407         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
408         if (rc) {
409                 ptlrpc_request_free(req);
410                 RETURN(ERR_PTR(rc));
411         }
412
413         /* pack the intent */
414         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
415         lit->opc = (__u64)it->it_op;
416
417         /* pack the intended request */
418         mdc_unlink_pack(req, op_data);
419
420         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
421                              obddev->u.cli.cl_default_mds_easize);
422         ptlrpc_request_set_replen(req);
423         RETURN(req);
424 }
425
426 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
427                                                       struct lookup_intent *it,
428                                                       struct md_op_data *op_data)
429 {
430         struct ptlrpc_request   *req;
431         struct obd_device       *obddev = class_exp2obd(exp);
432         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
433                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
434                                          OBD_MD_MEA | OBD_MD_FLACL;
435         struct ldlm_intent      *lit;
436         int                      rc;
437         __u32                    easize;
438         ENTRY;
439
440         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
441                                    &RQF_LDLM_INTENT_GETATTR);
442         if (req == NULL)
443                 RETURN(ERR_PTR(-ENOMEM));
444
445         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
446                              op_data->op_namelen + 1);
447
448         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 RETURN(ERR_PTR(rc));
452         }
453
454         /* pack the intent */
455         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
456         lit->opc = (__u64)it->it_op;
457
458         if (obddev->u.cli.cl_default_mds_easize > 0)
459                 easize = obddev->u.cli.cl_default_mds_easize;
460         else
461                 easize = obddev->u.cli.cl_max_mds_easize;
462
463         /* pack the intended request */
464         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
465
466         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
467         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
468                              req->rq_import->imp_connect_data.ocd_max_easize);
469         ptlrpc_request_set_replen(req);
470         RETURN(req);
471 }
472
473 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
474                                                      struct lookup_intent *it,
475                                                      struct md_op_data *op_data)
476 {
477         struct obd_device     *obd = class_exp2obd(exp);
478         struct ptlrpc_request *req;
479         struct ldlm_intent    *lit;
480         struct layout_intent  *layout;
481         int rc;
482         ENTRY;
483
484         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
485                                 &RQF_LDLM_INTENT_LAYOUT);
486         if (req == NULL)
487                 RETURN(ERR_PTR(-ENOMEM));
488
489         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
490         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
491         if (rc) {
492                 ptlrpc_request_free(req);
493                 RETURN(ERR_PTR(rc));
494         }
495
496         /* pack the intent */
497         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498         lit->opc = (__u64)it->it_op;
499
500         /* pack the layout intent request */
501         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
502         LASSERT(op_data->op_data != NULL);
503         LASSERT(op_data->op_data_size == sizeof(*layout));
504         memcpy(layout, op_data->op_data, sizeof(*layout));
505
506         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
507                              obd->u.cli.cl_default_mds_easize);
508         ptlrpc_request_set_replen(req);
509         RETURN(req);
510 }
511
512 static struct ptlrpc_request *
513 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
514 {
515         struct ptlrpc_request *req;
516         int rc;
517         ENTRY;
518
519         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
520         if (req == NULL)
521                 RETURN(ERR_PTR(-ENOMEM));
522
523         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
524         if (rc) {
525                 ptlrpc_request_free(req);
526                 RETURN(ERR_PTR(rc));
527         }
528
529         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
530         ptlrpc_request_set_replen(req);
531         RETURN(req);
532 }
533
534 static int mdc_finish_enqueue(struct obd_export *exp,
535                               struct ptlrpc_request *req,
536                               struct ldlm_enqueue_info *einfo,
537                               struct lookup_intent *it,
538                               struct lustre_handle *lockh,
539                               int rc)
540 {
541         struct req_capsule  *pill = &req->rq_pill;
542         struct ldlm_request *lockreq;
543         struct ldlm_reply   *lockrep;
544         struct ldlm_lock    *lock;
545         void                *lvb_data = NULL;
546         __u32                lvb_len = 0;
547         ENTRY;
548
549         LASSERT(rc >= 0);
550         /* Similarly, if we're going to replay this request, we don't want to
551          * actually get a lock, just perform the intent. */
552         if (req->rq_transno || req->rq_replay) {
553                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
554                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
555         }
556
557         if (rc == ELDLM_LOCK_ABORTED) {
558                 einfo->ei_mode = 0;
559                 memset(lockh, 0, sizeof(*lockh));
560                 rc = 0;
561         } else { /* rc = 0 */
562                 lock = ldlm_handle2lock(lockh);
563                 LASSERT(lock != NULL);
564
565                 /* If the server gave us back a different lock mode, we should
566                  * fix up our variables. */
567                 if (lock->l_req_mode != einfo->ei_mode) {
568                         ldlm_lock_addref(lockh, lock->l_req_mode);
569                         ldlm_lock_decref(lockh, einfo->ei_mode);
570                         einfo->ei_mode = lock->l_req_mode;
571                 }
572                 LDLM_LOCK_PUT(lock);
573         }
574
575         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
576         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
577
578         it->it_disposition = (int)lockrep->lock_policy_res1;
579         it->it_status = (int)lockrep->lock_policy_res2;
580         it->it_lock_mode = einfo->ei_mode;
581         it->it_lock_handle = lockh->cookie;
582         it->it_request = req;
583
584         /* Technically speaking rq_transno must already be zero if
585          * it_status is in error, so the check is a bit redundant */
586         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
587                 mdc_clear_replay_flag(req, it->it_status);
588
589         /* If we're doing an IT_OPEN which did not result in an actual
590          * successful open, then we need to remove the bit which saves
591          * this request for unconditional replay.
592          *
593          * It's important that we do this first!  Otherwise we might exit the
594          * function without doing so, and try to replay a failed create
595          * (bug 3440) */
596         if (it->it_op & IT_OPEN && req->rq_replay &&
597             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
598                 mdc_clear_replay_flag(req, it->it_status);
599
600         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
601                   it->it_op, it->it_disposition, it->it_status);
602
603         /* We know what to expect, so we do any byte flipping required here */
604         if (it_has_reply_body(it)) {
605                 struct mdt_body *body;
606
607                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
608                 if (body == NULL) {
609                         CERROR ("Can't swab mdt_body\n");
610                         RETURN (-EPROTO);
611                 }
612
613                 if (it_disposition(it, DISP_OPEN_OPEN) &&
614                     !it_open_error(DISP_OPEN_OPEN, it)) {
615                         /*
616                          * If this is a successful OPEN request, we need to set
617                          * replay handler and data early, so that if replay
618                          * happens immediately after swabbing below, new reply
619                          * is swabbed by that handler correctly.
620                          */
621                         mdc_set_open_replay_data(NULL, NULL, it);
622                 }
623
624                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
625                         void *eadata;
626
627                         mdc_update_max_ea_from_body(exp, body);
628
629                         /*
630                          * The eadata is opaque; just check that it is there.
631                          * Eventually, obd_unpackmd() will check the contents.
632                          */
633                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
634                                                         body->mbo_eadatasize);
635                         if (eadata == NULL)
636                                 RETURN(-EPROTO);
637
638                         /* save lvb data and length in case this is for layout
639                          * lock */
640                         lvb_data = eadata;
641                         lvb_len = body->mbo_eadatasize;
642
643                         /*
644                          * We save the reply LOV EA in case we have to replay a
645                          * create for recovery.  If we didn't allocate a large
646                          * enough request buffer above we need to reallocate it
647                          * here to hold the actual LOV EA.
648                          *
649                          * To not save LOV EA if request is not going to replay
650                          * (for example error one).
651                          */
652                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
653                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
654                                                     body->mbo_eadatasize);
655                                 if (rc) {
656                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
657                                         body->mbo_eadatasize = 0;
658                                         rc = 0;
659                                 }
660                         }
661                 }
662         } else if (it->it_op & IT_LAYOUT) {
663                 /* maybe the lock was granted right away and layout
664                  * is packed into RMF_DLM_LVB of req */
665                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
666                 if (lvb_len > 0) {
667                         lvb_data = req_capsule_server_sized_get(pill,
668                                                         &RMF_DLM_LVB, lvb_len);
669                         if (lvb_data == NULL)
670                                 RETURN(-EPROTO);
671
672                         /**
673                          * save replied layout data to the request buffer for
674                          * recovery consideration (lest MDS reinitialize
675                          * another set of OST objects).
676                          */
677                         if (req->rq_transno)
678                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
679                                                      lvb_len);
680                 }
681         }
682
683         /* fill in stripe data for layout lock.
684          * LU-6581: trust layout data only if layout lock is granted. The MDT
685          * has stopped sending layout unless the layout lock is granted. The
686          * client still does this checking in case it's talking with an old
687          * server. - Jinshan */
688         lock = ldlm_handle2lock(lockh);
689         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
690             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
691                 void *lmm;
692
693                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
694                         ldlm_it2str(it->it_op), lvb_len);
695
696                 OBD_ALLOC_LARGE(lmm, lvb_len);
697                 if (lmm == NULL) {
698                         LDLM_LOCK_PUT(lock);
699                         RETURN(-ENOMEM);
700                 }
701                 memcpy(lmm, lvb_data, lvb_len);
702
703                 /* install lvb_data */
704                 lock_res_and_lock(lock);
705                 if (lock->l_lvb_data == NULL) {
706                         lock->l_lvb_type = LVB_T_LAYOUT;
707                         lock->l_lvb_data = lmm;
708                         lock->l_lvb_len = lvb_len;
709                         lmm = NULL;
710                 }
711                 unlock_res_and_lock(lock);
712                 if (lmm != NULL)
713                         OBD_FREE_LARGE(lmm, lvb_len);
714         }
715         if (lock != NULL)
716                 LDLM_LOCK_PUT(lock);
717
718         RETURN(rc);
719 }
720
721 /* We always reserve enough space in the reply packet for a stripe MD, because
722  * we don't know in advance the file type. */
723 static int mdc_enqueue_base(struct obd_export *exp,
724                             struct ldlm_enqueue_info *einfo,
725                             const union ldlm_policy_data *policy,
726                             struct lookup_intent *it,
727                             struct md_op_data *op_data,
728                             struct lustre_handle *lockh,
729                             __u64 extra_lock_flags)
730 {
731         struct obd_device *obddev = class_exp2obd(exp);
732         struct ptlrpc_request *req = NULL;
733         __u64 flags, saved_flags = extra_lock_flags;
734         struct ldlm_res_id res_id;
735         static const union ldlm_policy_data lookup_policy = {
736                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
737         static const union ldlm_policy_data update_policy = {
738                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
739         static const union ldlm_policy_data layout_policy = {
740                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
741         static const union ldlm_policy_data getxattr_policy = {
742                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
743         int generation, resends = 0;
744         struct ldlm_reply *lockrep;
745         enum lvb_type lvb_type = 0;
746         int rc;
747         ENTRY;
748
749         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750                  einfo->ei_type);
751         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
752
753         if (it != NULL) {
754                 LASSERT(policy == NULL);
755
756                 saved_flags |= LDLM_FL_HAS_INTENT;
757                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
758                         policy = &update_policy;
759                 else if (it->it_op & IT_LAYOUT)
760                         policy = &layout_policy;
761                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
762                         policy = &getxattr_policy;
763                 else
764                         policy = &lookup_policy;
765         }
766
767         generation = obddev->u.cli.cl_import->imp_generation;
768 resend:
769         flags = saved_flags;
770         if (it == NULL) {
771                 /* The only way right now is FLOCK. */
772                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
773                          einfo->ei_type);
774                 res_id.name[3] = LDLM_FLOCK;
775         } else if (it->it_op & IT_OPEN) {
776                 req = mdc_intent_open_pack(exp, it, op_data);
777         } else if (it->it_op & IT_UNLINK) {
778                 req = mdc_intent_unlink_pack(exp, it, op_data);
779         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
780                 req = mdc_intent_getattr_pack(exp, it, op_data);
781         } else if (it->it_op & IT_READDIR) {
782                 req = mdc_enqueue_pack(exp, 0);
783         } else if (it->it_op & IT_LAYOUT) {
784                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
785                         RETURN(-EOPNOTSUPP);
786                 req = mdc_intent_layout_pack(exp, it, op_data);
787                 lvb_type = LVB_T_LAYOUT;
788         } else if (it->it_op & IT_GETXATTR) {
789                 req = mdc_intent_getxattr_pack(exp, it, op_data);
790         } else {
791                 LBUG();
792                 RETURN(-EINVAL);
793         }
794
795         if (IS_ERR(req))
796                 RETURN(PTR_ERR(req));
797
798         if (resends) {
799                 req->rq_generation_set = 1;
800                 req->rq_import_generation = generation;
801                 req->rq_sent = ktime_get_real_seconds() + resends;
802         }
803
804         /* It is important to obtain modify RPC slot first (if applicable), so
805          * that threads that are waiting for a modify RPC slot are not polluting
806          * our rpcs in flight counter.
807          * We do not do flock request limiting, though */
808         if (it) {
809                 mdc_get_mod_rpc_slot(req, it);
810                 rc = obd_get_request_slot(&obddev->u.cli);
811                 if (rc != 0) {
812                         mdc_put_mod_rpc_slot(req, it);
813                         mdc_clear_replay_flag(req, 0);
814                         ptlrpc_req_finished(req);
815                         RETURN(rc);
816                 }
817         }
818
819         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
820                               0, lvb_type, lockh, 0);
821         if (!it) {
822                 /* For flock requests we immediatelly return without further
823                    delay and let caller deal with the rest, since rest of
824                    this function metadata processing makes no sense for flock
825                    requests anyway. But in case of problem during comms with
826                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
827                    can not rely on caller and this mainly for F_UNLCKs
828                    (explicits or automatically generated by Kernel to clean
829                    current FLocks upon exit) that can't be trashed */
830                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
831                     (einfo->ei_type == LDLM_FLOCK) &&
832                     (einfo->ei_mode == LCK_NL))
833                         goto resend;
834                 RETURN(rc);
835         }
836
837         obd_put_request_slot(&obddev->u.cli);
838         mdc_put_mod_rpc_slot(req, it);
839
840         if (rc < 0) {
841                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
842                        obddev->obd_name, rc);
843
844                 mdc_clear_replay_flag(req, rc);
845                 ptlrpc_req_finished(req);
846                 RETURN(rc);
847         }
848
849         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
850         LASSERT(lockrep != NULL);
851
852         lockrep->lock_policy_res2 =
853                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
854
855         /* Retry infinitely when the server returns -EINPROGRESS for the
856          * intent operation, when server returns -EINPROGRESS for acquiring
857          * intent lock, we'll retry in after_reply(). */
858         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
859                 mdc_clear_replay_flag(req, rc);
860                 ptlrpc_req_finished(req);
861                 resends++;
862
863                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
864                        obddev->obd_name, resends, it->it_op,
865                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
866
867                 if (generation == obddev->u.cli.cl_import->imp_generation) {
868                         goto resend;
869                 } else {
870                         CDEBUG(D_HA, "resend cross eviction\n");
871                         RETURN(-EIO);
872                 }
873         }
874
875         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
876         if (rc < 0) {
877                 if (lustre_handle_is_used(lockh)) {
878                         ldlm_lock_decref(lockh, einfo->ei_mode);
879                         memset(lockh, 0, sizeof(*lockh));
880                 }
881                 ptlrpc_req_finished(req);
882
883                 it->it_lock_handle = 0;
884                 it->it_lock_mode = 0;
885                 it->it_request = NULL;
886         }
887
888         RETURN(rc);
889 }
890
891 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
892                 const union ldlm_policy_data *policy,
893                 struct md_op_data *op_data,
894                 struct lustre_handle *lockh, __u64 extra_lock_flags)
895 {
896         return mdc_enqueue_base(exp, einfo, policy, NULL,
897                                 op_data, lockh, extra_lock_flags);
898 }
899
900 static int mdc_finish_intent_lock(struct obd_export *exp,
901                                   struct ptlrpc_request *request,
902                                   struct md_op_data *op_data,
903                                   struct lookup_intent *it,
904                                   struct lustre_handle *lockh)
905 {
906         struct lustre_handle old_lock;
907         struct ldlm_lock *lock;
908         int rc = 0;
909         ENTRY;
910
911         LASSERT(request != NULL);
912         LASSERT(request != LP_POISON);
913         LASSERT(request->rq_repmsg != LP_POISON);
914
915         if (it->it_op & IT_READDIR)
916                 RETURN(0);
917
918         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
919                 if (it->it_status != 0)
920                         GOTO(out, rc = it->it_status);
921         } else {
922                 if (!it_disposition(it, DISP_IT_EXECD)) {
923                         /* The server failed before it even started executing
924                          * the intent, i.e. because it couldn't unpack the
925                          * request.
926                          */
927                         LASSERT(it->it_status != 0);
928                         GOTO(out, rc = it->it_status);
929                 }
930                 rc = it_open_error(DISP_IT_EXECD, it);
931                 if (rc)
932                         GOTO(out, rc);
933
934                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
935                 if (rc)
936                         GOTO(out, rc);
937
938                 /* keep requests around for the multiple phases of the call
939                  * this shows the DISP_XX must guarantee we make it into the
940                  * call
941                  */
942                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
943                     it_disposition(it, DISP_OPEN_CREATE) &&
944                     !it_open_error(DISP_OPEN_CREATE, it)) {
945                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
946                         /* balanced in ll_create_node */
947                         ptlrpc_request_addref(request);
948                 }
949                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
950                     it_disposition(it, DISP_OPEN_OPEN) &&
951                     !it_open_error(DISP_OPEN_OPEN, it)) {
952                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
953                         /* balanced in ll_file_open */
954                         ptlrpc_request_addref(request);
955                         /* BUG 11546 - eviction in the middle of open rpc
956                          * processing
957                          */
958                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
959                                          obd_timeout);
960                 }
961
962                 if (it->it_op & IT_CREAT) {
963                         /* XXX this belongs in ll_create_it */
964                 } else if (it->it_op == IT_OPEN) {
965                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
966                 } else {
967                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
968                 }
969         }
970
971         /* If we already have a matching lock, then cancel the new
972          * one.  We have to set the data here instead of in
973          * mdc_enqueue, because we need to use the child's inode as
974          * the l_ast_data to match, and that's not available until
975          * intent_finish has performed the iget().) */
976         lock = ldlm_handle2lock(lockh);
977         if (lock) {
978                 union ldlm_policy_data policy = lock->l_policy_data;
979                 LDLM_DEBUG(lock, "matching against this");
980
981                 if (it_has_reply_body(it)) {
982                         struct mdt_body *body;
983
984                         body = req_capsule_server_get(&request->rq_pill,
985                                                       &RMF_MDT_BODY);
986                         /* mdc_enqueue checked */
987                         LASSERT(body != NULL);
988                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
989                                                  &lock->l_resource->lr_name),
990                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
991                                  PLDLMRES(lock->l_resource),
992                                  PFID(&body->mbo_fid1));
993                 }
994                 LDLM_LOCK_PUT(lock);
995
996                 memcpy(&old_lock, lockh, sizeof(*lockh));
997                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
998                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
999                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1000                         memcpy(lockh, &old_lock, sizeof(old_lock));
1001                         it->it_lock_handle = lockh->cookie;
1002                 }
1003         }
1004
1005         EXIT;
1006 out:
1007         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1008                 (int)op_data->op_namelen, op_data->op_name,
1009                 ldlm_it2str(it->it_op), it->it_status,
1010                 it->it_disposition, rc);
1011         return rc;
1012 }
1013
1014 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1015                         struct lu_fid *fid, __u64 *bits)
1016 {
1017         /* We could just return 1 immediately, but since we should only
1018          * be called in revalidate_it if we already have a lock, let's
1019          * verify that. */
1020         struct ldlm_res_id res_id;
1021         struct lustre_handle lockh;
1022         union ldlm_policy_data policy;
1023         enum ldlm_mode mode;
1024         ENTRY;
1025
1026         if (it->it_lock_handle) {
1027                 lockh.cookie = it->it_lock_handle;
1028                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1029         } else {
1030                 fid_build_reg_res_name(fid, &res_id);
1031                 switch (it->it_op) {
1032                 case IT_GETATTR:
1033                         /* File attributes are held under multiple bits:
1034                          * nlink is under lookup lock, size and times are
1035                          * under UPDATE lock and recently we've also got
1036                          * a separate permissions lock for owner/group/acl that
1037                          * were protected by lookup lock before.
1038                          * Getattr must provide all of that information,
1039                          * so we need to ensure we have all of those locks.
1040                          * Unfortunately, if the bits are split across multiple
1041                          * locks, there's no easy way to match all of them here,
1042                          * so an extra RPC would be performed to fetch all
1043                          * of those bits at once for now. */
1044                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1045                          * but for old MDTs (< 2.4), permission is covered
1046                          * by LOOKUP lock, so it needs to match all bits here.*/
1047                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1048                                                   MDS_INODELOCK_LOOKUP |
1049                                                   MDS_INODELOCK_PERM;
1050                         break;
1051                 case IT_READDIR:
1052                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1053                         break;
1054                 case IT_LAYOUT:
1055                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1056                         break;
1057                 default:
1058                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1059                         break;
1060                 }
1061
1062                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1063                                       LDLM_IBITS, &policy,
1064                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1065                                       &lockh);
1066         }
1067
1068         if (mode) {
1069                 it->it_lock_handle = lockh.cookie;
1070                 it->it_lock_mode = mode;
1071         } else {
1072                 it->it_lock_handle = 0;
1073                 it->it_lock_mode = 0;
1074         }
1075
1076         RETURN(!!mode);
1077 }
1078
1079 /*
1080  * This long block is all about fixing up the lock and request state
1081  * so that it is correct as of the moment _before_ the operation was
1082  * applied; that way, the VFS will think that everything is normal and
1083  * call Lustre's regular VFS methods.
1084  *
1085  * If we're performing a creation, that means that unless the creation
1086  * failed with EEXIST, we should fake up a negative dentry.
1087  *
1088  * For everything else, we want to lookup to succeed.
1089  *
1090  * One additional note: if CREATE or OPEN succeeded, we add an extra
1091  * reference to the request because we need to keep it around until
1092  * ll_create/ll_open gets called.
1093  *
1094  * The server will return to us, in it_disposition, an indication of
1095  * exactly what it_status refers to.
1096  *
1097  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1098  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1099  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1100  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1101  * was successful.
1102  *
1103  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1104  * child lookup.
1105  */
1106 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1107                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1108                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1109 {
1110         struct ldlm_enqueue_info einfo = {
1111                 .ei_type        = LDLM_IBITS,
1112                 .ei_mode        = it_to_lock_mode(it),
1113                 .ei_cb_bl       = cb_blocking,
1114                 .ei_cb_cp       = ldlm_completion_ast,
1115         };
1116         struct lustre_handle lockh;
1117         int rc = 0;
1118         ENTRY;
1119         LASSERT(it);
1120
1121         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1122                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1123                 op_data->op_name, PFID(&op_data->op_fid2),
1124                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1125                 it->it_flags);
1126
1127         lockh.cookie = 0;
1128         if (fid_is_sane(&op_data->op_fid2) &&
1129             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1130                 /* We could just return 1 immediately, but since we should only
1131                  * be called in revalidate_it if we already have a lock, let's
1132                  * verify that. */
1133                 it->it_lock_handle = 0;
1134                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1135                 /* Only return failure if it was not GETATTR by cfid
1136                    (from inode_revalidate) */
1137                 if (rc || op_data->op_namelen != 0)
1138                         RETURN(rc);
1139         }
1140
1141         /* For case if upper layer did not alloc fid, do it now. */
1142         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1143                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1144                 if (rc < 0) {
1145                         CERROR("Can't alloc new fid, rc %d\n", rc);
1146                         RETURN(rc);
1147                 }
1148         }
1149
1150         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1151                               extra_lock_flags);
1152         if (rc < 0)
1153                 RETURN(rc);
1154
1155         *reqp = it->it_request;
1156         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1157         RETURN(rc);
1158 }
1159
1160 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1161                                               struct ptlrpc_request *req,
1162                                               void *args, int rc)
1163 {
1164         struct mdc_getattr_args  *ga = args;
1165         struct obd_export        *exp = ga->ga_exp;
1166         struct md_enqueue_info   *minfo = ga->ga_minfo;
1167         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1168         struct lookup_intent     *it;
1169         struct lustre_handle     *lockh;
1170         struct obd_device        *obddev;
1171         struct ldlm_reply        *lockrep;
1172         __u64                     flags = LDLM_FL_HAS_INTENT;
1173         ENTRY;
1174
1175         it    = &minfo->mi_it;
1176         lockh = &minfo->mi_lockh;
1177
1178         obddev = class_exp2obd(exp);
1179
1180         obd_put_request_slot(&obddev->u.cli);
1181         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1182                 rc = -ETIMEDOUT;
1183
1184         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1185                                    &flags, NULL, 0, lockh, rc);
1186         if (rc < 0) {
1187                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1188                 mdc_clear_replay_flag(req, rc);
1189                 GOTO(out, rc);
1190         }
1191
1192         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1193         LASSERT(lockrep != NULL);
1194
1195         lockrep->lock_policy_res2 =
1196                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1197
1198         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1199         if (rc)
1200                 GOTO(out, rc);
1201
1202         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1203         EXIT;
1204
1205 out:
1206         minfo->mi_cb(req, minfo, rc);
1207         return 0;
1208 }
1209
1210 int mdc_intent_getattr_async(struct obd_export *exp,
1211                              struct md_enqueue_info *minfo)
1212 {
1213         struct md_op_data       *op_data = &minfo->mi_data;
1214         struct lookup_intent    *it = &minfo->mi_it;
1215         struct ptlrpc_request   *req;
1216         struct mdc_getattr_args *ga;
1217         struct obd_device       *obddev = class_exp2obd(exp);
1218         struct ldlm_res_id       res_id;
1219         union ldlm_policy_data policy = {
1220                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1221                                                  MDS_INODELOCK_UPDATE } };
1222         int                      rc = 0;
1223         __u64                    flags = LDLM_FL_HAS_INTENT;
1224         ENTRY;
1225
1226         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1227                 (int)op_data->op_namelen, op_data->op_name,
1228                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1229
1230         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1231         req = mdc_intent_getattr_pack(exp, it, op_data);
1232         if (IS_ERR(req))
1233                 RETURN(PTR_ERR(req));
1234
1235         rc = obd_get_request_slot(&obddev->u.cli);
1236         if (rc != 0) {
1237                 ptlrpc_req_finished(req);
1238                 RETURN(rc);
1239         }
1240
1241         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1242                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1243         if (rc < 0) {
1244                 obd_put_request_slot(&obddev->u.cli);
1245                 ptlrpc_req_finished(req);
1246                 RETURN(rc);
1247         }
1248
1249         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1250         ga = ptlrpc_req_async_args(req);
1251         ga->ga_exp = exp;
1252         ga->ga_minfo = minfo;
1253
1254         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1255         ptlrpcd_add_req(req);
1256
1257         RETURN(0);
1258 }