Whamcloud - gitweb
LU-9008 pfl: dynamic layout modification with write/truncate
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         ptlrpc_request_set_replen(req);
334         return req;
335 }
336
337 static struct ptlrpc_request *
338 mdc_intent_getxattr_pack(struct obd_export *exp,
339                          struct lookup_intent *it,
340                          struct md_op_data *op_data)
341 {
342         struct ptlrpc_request   *req;
343         struct ldlm_intent      *lit;
344         int                     rc, count = 0;
345         __u32                   maxdata;
346         struct list_head        cancels = LIST_HEAD_INIT(cancels);
347
348         ENTRY;
349
350         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
351                                         &RQF_LDLM_INTENT_GETXATTR);
352         if (req == NULL)
353                 RETURN(ERR_PTR(-ENOMEM));
354
355         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
356         if (rc) {
357                 ptlrpc_request_free(req);
358                 RETURN(ERR_PTR(rc));
359         }
360
361         /* pack the intent */
362         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363         lit->opc = IT_GETXATTR;
364
365         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
366
367         /* pack the intended request */
368         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
369                       0);
370
371         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
372                                 RCL_SERVER, maxdata);
373
374         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
375                                 RCL_SERVER, maxdata);
376
377         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
378                                 RCL_SERVER, maxdata);
379
380         ptlrpc_request_set_replen(req);
381
382         RETURN(req);
383 }
384
385 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
386                                                      struct lookup_intent *it,
387                                                      struct md_op_data *op_data)
388 {
389         struct ptlrpc_request *req;
390         struct obd_device     *obddev = class_exp2obd(exp);
391         struct ldlm_intent    *lit;
392         int                    rc;
393         ENTRY;
394
395         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
396                                    &RQF_LDLM_INTENT_UNLINK);
397         if (req == NULL)
398                 RETURN(ERR_PTR(-ENOMEM));
399
400         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
401                              op_data->op_namelen + 1);
402
403         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
404         if (rc) {
405                 ptlrpc_request_free(req);
406                 RETURN(ERR_PTR(rc));
407         }
408
409         /* pack the intent */
410         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
411         lit->opc = (__u64)it->it_op;
412
413         /* pack the intended request */
414         mdc_unlink_pack(req, op_data);
415
416         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
417                              obddev->u.cli.cl_default_mds_easize);
418         ptlrpc_request_set_replen(req);
419         RETURN(req);
420 }
421
422 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
423                                                       struct lookup_intent *it,
424                                                       struct md_op_data *op_data)
425 {
426         struct ptlrpc_request   *req;
427         struct obd_device       *obddev = class_exp2obd(exp);
428         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
429                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
430                                          OBD_MD_MEA | OBD_MD_FLACL;
431         struct ldlm_intent      *lit;
432         int                      rc;
433         __u32                    easize;
434         ENTRY;
435
436         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437                                    &RQF_LDLM_INTENT_GETATTR);
438         if (req == NULL)
439                 RETURN(ERR_PTR(-ENOMEM));
440
441         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
442                              op_data->op_namelen + 1);
443
444         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
445         if (rc) {
446                 ptlrpc_request_free(req);
447                 RETURN(ERR_PTR(rc));
448         }
449
450         /* pack the intent */
451         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
452         lit->opc = (__u64)it->it_op;
453
454         if (obddev->u.cli.cl_default_mds_easize > 0)
455                 easize = obddev->u.cli.cl_default_mds_easize;
456         else
457                 easize = obddev->u.cli.cl_max_mds_easize;
458
459         /* pack the intended request */
460         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
461
462         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
463         ptlrpc_request_set_replen(req);
464         RETURN(req);
465 }
466
467 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
468                                                      struct lookup_intent *it,
469                                                      struct md_op_data *op_data)
470 {
471         struct obd_device     *obd = class_exp2obd(exp);
472         struct ptlrpc_request *req;
473         struct ldlm_intent    *lit;
474         struct layout_intent  *layout;
475         int rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
479                                 &RQF_LDLM_INTENT_LAYOUT);
480         if (req == NULL)
481                 RETURN(ERR_PTR(-ENOMEM));
482
483         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
484         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
485         if (rc) {
486                 ptlrpc_request_free(req);
487                 RETURN(ERR_PTR(rc));
488         }
489
490         /* pack the intent */
491         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
492         lit->opc = (__u64)it->it_op;
493
494         /* pack the layout intent request */
495         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
496         LASSERT(op_data->op_data != NULL);
497         LASSERT(op_data->op_data_size == sizeof(*layout));
498         memcpy(layout, op_data->op_data, sizeof(*layout));
499
500         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
501                              obd->u.cli.cl_default_mds_easize);
502         ptlrpc_request_set_replen(req);
503         RETURN(req);
504 }
505
506 static struct ptlrpc_request *
507 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
508 {
509         struct ptlrpc_request *req;
510         int rc;
511         ENTRY;
512
513         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
514         if (req == NULL)
515                 RETURN(ERR_PTR(-ENOMEM));
516
517         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
518         if (rc) {
519                 ptlrpc_request_free(req);
520                 RETURN(ERR_PTR(rc));
521         }
522
523         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
524         ptlrpc_request_set_replen(req);
525         RETURN(req);
526 }
527
528 static int mdc_finish_enqueue(struct obd_export *exp,
529                               struct ptlrpc_request *req,
530                               struct ldlm_enqueue_info *einfo,
531                               struct lookup_intent *it,
532                               struct lustre_handle *lockh,
533                               int rc)
534 {
535         struct req_capsule  *pill = &req->rq_pill;
536         struct ldlm_request *lockreq;
537         struct ldlm_reply   *lockrep;
538         struct ldlm_lock    *lock;
539         void                *lvb_data = NULL;
540         __u32                lvb_len = 0;
541         ENTRY;
542
543         LASSERT(rc >= 0);
544         /* Similarly, if we're going to replay this request, we don't want to
545          * actually get a lock, just perform the intent. */
546         if (req->rq_transno || req->rq_replay) {
547                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
548                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
549         }
550
551         if (rc == ELDLM_LOCK_ABORTED) {
552                 einfo->ei_mode = 0;
553                 memset(lockh, 0, sizeof(*lockh));
554                 rc = 0;
555         } else { /* rc = 0 */
556                 lock = ldlm_handle2lock(lockh);
557                 LASSERT(lock != NULL);
558
559                 /* If the server gave us back a different lock mode, we should
560                  * fix up our variables. */
561                 if (lock->l_req_mode != einfo->ei_mode) {
562                         ldlm_lock_addref(lockh, lock->l_req_mode);
563                         ldlm_lock_decref(lockh, einfo->ei_mode);
564                         einfo->ei_mode = lock->l_req_mode;
565                 }
566                 LDLM_LOCK_PUT(lock);
567         }
568
569         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
570         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
571
572         it->it_disposition = (int)lockrep->lock_policy_res1;
573         it->it_status = (int)lockrep->lock_policy_res2;
574         it->it_lock_mode = einfo->ei_mode;
575         it->it_lock_handle = lockh->cookie;
576         it->it_request = req;
577
578         /* Technically speaking rq_transno must already be zero if
579          * it_status is in error, so the check is a bit redundant */
580         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
581                 mdc_clear_replay_flag(req, it->it_status);
582
583         /* If we're doing an IT_OPEN which did not result in an actual
584          * successful open, then we need to remove the bit which saves
585          * this request for unconditional replay.
586          *
587          * It's important that we do this first!  Otherwise we might exit the
588          * function without doing so, and try to replay a failed create
589          * (bug 3440) */
590         if (it->it_op & IT_OPEN && req->rq_replay &&
591             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
592                 mdc_clear_replay_flag(req, it->it_status);
593
594         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
595                   it->it_op, it->it_disposition, it->it_status);
596
597         /* We know what to expect, so we do any byte flipping required here */
598         if (it_has_reply_body(it)) {
599                 struct mdt_body *body;
600
601                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
602                 if (body == NULL) {
603                         CERROR ("Can't swab mdt_body\n");
604                         RETURN (-EPROTO);
605                 }
606
607                 if (it_disposition(it, DISP_OPEN_OPEN) &&
608                     !it_open_error(DISP_OPEN_OPEN, it)) {
609                         /*
610                          * If this is a successful OPEN request, we need to set
611                          * replay handler and data early, so that if replay
612                          * happens immediately after swabbing below, new reply
613                          * is swabbed by that handler correctly.
614                          */
615                         mdc_set_open_replay_data(NULL, NULL, it);
616                 }
617
618                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
619                         void *eadata;
620
621                         mdc_update_max_ea_from_body(exp, body);
622
623                         /*
624                          * The eadata is opaque; just check that it is there.
625                          * Eventually, obd_unpackmd() will check the contents.
626                          */
627                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
628                                                         body->mbo_eadatasize);
629                         if (eadata == NULL)
630                                 RETURN(-EPROTO);
631
632                         /* save lvb data and length in case this is for layout
633                          * lock */
634                         lvb_data = eadata;
635                         lvb_len = body->mbo_eadatasize;
636
637                         /*
638                          * We save the reply LOV EA in case we have to replay a
639                          * create for recovery.  If we didn't allocate a large
640                          * enough request buffer above we need to reallocate it
641                          * here to hold the actual LOV EA.
642                          *
643                          * To not save LOV EA if request is not going to replay
644                          * (for example error one).
645                          */
646                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
647                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
648                                                     body->mbo_eadatasize);
649                                 if (rc) {
650                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
651                                         body->mbo_eadatasize = 0;
652                                         rc = 0;
653                                 }
654                         }
655                 }
656         } else if (it->it_op & IT_LAYOUT) {
657                 /* maybe the lock was granted right away and layout
658                  * is packed into RMF_DLM_LVB of req */
659                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
660                 if (lvb_len > 0) {
661                         lvb_data = req_capsule_server_sized_get(pill,
662                                                         &RMF_DLM_LVB, lvb_len);
663                         if (lvb_data == NULL)
664                                 RETURN(-EPROTO);
665
666                         /**
667                          * save replied layout data to the request buffer for
668                          * recovery consideration (lest MDS reinitialize
669                          * another set of OST objects).
670                          */
671                         if (req->rq_transno)
672                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
673                                                      lvb_len);
674                 }
675         }
676
677         /* fill in stripe data for layout lock.
678          * LU-6581: trust layout data only if layout lock is granted. The MDT
679          * has stopped sending layout unless the layout lock is granted. The
680          * client still does this checking in case it's talking with an old
681          * server. - Jinshan */
682         lock = ldlm_handle2lock(lockh);
683         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
684             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
685                 void *lmm;
686
687                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
688                         ldlm_it2str(it->it_op), lvb_len);
689
690                 OBD_ALLOC_LARGE(lmm, lvb_len);
691                 if (lmm == NULL) {
692                         LDLM_LOCK_PUT(lock);
693                         RETURN(-ENOMEM);
694                 }
695                 memcpy(lmm, lvb_data, lvb_len);
696
697                 /* install lvb_data */
698                 lock_res_and_lock(lock);
699                 if (lock->l_lvb_data == NULL) {
700                         lock->l_lvb_type = LVB_T_LAYOUT;
701                         lock->l_lvb_data = lmm;
702                         lock->l_lvb_len = lvb_len;
703                         lmm = NULL;
704                 }
705                 unlock_res_and_lock(lock);
706                 if (lmm != NULL)
707                         OBD_FREE_LARGE(lmm, lvb_len);
708         }
709         if (lock != NULL)
710                 LDLM_LOCK_PUT(lock);
711
712         RETURN(rc);
713 }
714
715 /* We always reserve enough space in the reply packet for a stripe MD, because
716  * we don't know in advance the file type. */
717 static int mdc_enqueue_base(struct obd_export *exp,
718                             struct ldlm_enqueue_info *einfo,
719                             const union ldlm_policy_data *policy,
720                             struct lookup_intent *it,
721                             struct md_op_data *op_data,
722                             struct lustre_handle *lockh,
723                             __u64 extra_lock_flags)
724 {
725         struct obd_device *obddev = class_exp2obd(exp);
726         struct ptlrpc_request *req = NULL;
727         __u64 flags, saved_flags = extra_lock_flags;
728         struct ldlm_res_id res_id;
729         static const union ldlm_policy_data lookup_policy = {
730                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
731         static const union ldlm_policy_data update_policy = {
732                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
733         static const union ldlm_policy_data layout_policy = {
734                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
735         static const union ldlm_policy_data getxattr_policy = {
736                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
737         int generation, resends = 0;
738         struct ldlm_reply *lockrep;
739         enum lvb_type lvb_type = 0;
740         int rc;
741         ENTRY;
742
743         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
744                  einfo->ei_type);
745         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
746
747         if (it != NULL) {
748                 LASSERT(policy == NULL);
749
750                 saved_flags |= LDLM_FL_HAS_INTENT;
751                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
752                         policy = &update_policy;
753                 else if (it->it_op & IT_LAYOUT)
754                         policy = &layout_policy;
755                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
756                         policy = &getxattr_policy;
757                 else
758                         policy = &lookup_policy;
759         }
760
761         generation = obddev->u.cli.cl_import->imp_generation;
762 resend:
763         flags = saved_flags;
764         if (it == NULL) {
765                 /* The only way right now is FLOCK. */
766                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
767                          einfo->ei_type);
768                 res_id.name[3] = LDLM_FLOCK;
769         } else if (it->it_op & IT_OPEN) {
770                 req = mdc_intent_open_pack(exp, it, op_data);
771         } else if (it->it_op & IT_UNLINK) {
772                 req = mdc_intent_unlink_pack(exp, it, op_data);
773         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
774                 req = mdc_intent_getattr_pack(exp, it, op_data);
775         } else if (it->it_op & IT_READDIR) {
776                 req = mdc_enqueue_pack(exp, 0);
777         } else if (it->it_op & IT_LAYOUT) {
778                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
779                         RETURN(-EOPNOTSUPP);
780                 req = mdc_intent_layout_pack(exp, it, op_data);
781                 lvb_type = LVB_T_LAYOUT;
782         } else if (it->it_op & IT_GETXATTR) {
783                 req = mdc_intent_getxattr_pack(exp, it, op_data);
784         } else {
785                 LBUG();
786                 RETURN(-EINVAL);
787         }
788
789         if (IS_ERR(req))
790                 RETURN(PTR_ERR(req));
791
792         if (resends) {
793                 req->rq_generation_set = 1;
794                 req->rq_import_generation = generation;
795                 req->rq_sent = cfs_time_current_sec() + resends;
796         }
797
798         /* It is important to obtain modify RPC slot first (if applicable), so
799          * that threads that are waiting for a modify RPC slot are not polluting
800          * our rpcs in flight counter.
801          * We do not do flock request limiting, though */
802         if (it) {
803                 mdc_get_mod_rpc_slot(req, it);
804                 rc = obd_get_request_slot(&obddev->u.cli);
805                 if (rc != 0) {
806                         mdc_put_mod_rpc_slot(req, it);
807                         mdc_clear_replay_flag(req, 0);
808                         ptlrpc_req_finished(req);
809                         RETURN(rc);
810                 }
811         }
812
813         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
814                               0, lvb_type, lockh, 0);
815         if (!it) {
816                 /* For flock requests we immediatelly return without further
817                    delay and let caller deal with the rest, since rest of
818                    this function metadata processing makes no sense for flock
819                    requests anyway. But in case of problem during comms with
820                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
821                    can not rely on caller and this mainly for F_UNLCKs
822                    (explicits or automatically generated by Kernel to clean
823                    current FLocks upon exit) that can't be trashed */
824                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
825                     (einfo->ei_type == LDLM_FLOCK) &&
826                     (einfo->ei_mode == LCK_NL))
827                         goto resend;
828                 RETURN(rc);
829         }
830
831         obd_put_request_slot(&obddev->u.cli);
832         mdc_put_mod_rpc_slot(req, it);
833
834         if (rc < 0) {
835                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
836                        obddev->obd_name, rc);
837
838                 mdc_clear_replay_flag(req, rc);
839                 ptlrpc_req_finished(req);
840                 RETURN(rc);
841         }
842
843         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
844         LASSERT(lockrep != NULL);
845
846         lockrep->lock_policy_res2 =
847                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
848
849         /* Retry infinitely when the server returns -EINPROGRESS for the
850          * intent operation, when server returns -EINPROGRESS for acquiring
851          * intent lock, we'll retry in after_reply(). */
852         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
853                 mdc_clear_replay_flag(req, rc);
854                 ptlrpc_req_finished(req);
855                 resends++;
856
857                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
858                        obddev->obd_name, resends, it->it_op,
859                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
860
861                 if (generation == obddev->u.cli.cl_import->imp_generation) {
862                         goto resend;
863                 } else {
864                         CDEBUG(D_HA, "resend cross eviction\n");
865                         RETURN(-EIO);
866                 }
867         }
868
869         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
870         if (rc < 0) {
871                 if (lustre_handle_is_used(lockh)) {
872                         ldlm_lock_decref(lockh, einfo->ei_mode);
873                         memset(lockh, 0, sizeof(*lockh));
874                 }
875                 ptlrpc_req_finished(req);
876
877                 it->it_lock_handle = 0;
878                 it->it_lock_mode = 0;
879                 it->it_request = NULL;
880         }
881
882         RETURN(rc);
883 }
884
885 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
886                 const union ldlm_policy_data *policy,
887                 struct md_op_data *op_data,
888                 struct lustre_handle *lockh, __u64 extra_lock_flags)
889 {
890         return mdc_enqueue_base(exp, einfo, policy, NULL,
891                                 op_data, lockh, extra_lock_flags);
892 }
893
894 static int mdc_finish_intent_lock(struct obd_export *exp,
895                                   struct ptlrpc_request *request,
896                                   struct md_op_data *op_data,
897                                   struct lookup_intent *it,
898                                   struct lustre_handle *lockh)
899 {
900         struct lustre_handle old_lock;
901         struct ldlm_lock *lock;
902         int rc = 0;
903         ENTRY;
904
905         LASSERT(request != NULL);
906         LASSERT(request != LP_POISON);
907         LASSERT(request->rq_repmsg != LP_POISON);
908
909         if (it->it_op & IT_READDIR)
910                 RETURN(0);
911
912         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
913                 if (it->it_status != 0)
914                         GOTO(out, rc = it->it_status);
915         } else {
916                 if (!it_disposition(it, DISP_IT_EXECD)) {
917                         /* The server failed before it even started executing
918                          * the intent, i.e. because it couldn't unpack the
919                          * request.
920                          */
921                         LASSERT(it->it_status != 0);
922                         GOTO(out, rc = it->it_status);
923                 }
924                 rc = it_open_error(DISP_IT_EXECD, it);
925                 if (rc)
926                         GOTO(out, rc);
927
928                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
929                 if (rc)
930                         GOTO(out, rc);
931
932                 /* keep requests around for the multiple phases of the call
933                  * this shows the DISP_XX must guarantee we make it into the
934                  * call
935                  */
936                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
937                     it_disposition(it, DISP_OPEN_CREATE) &&
938                     !it_open_error(DISP_OPEN_CREATE, it)) {
939                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
940                         /* balanced in ll_create_node */
941                         ptlrpc_request_addref(request);
942                 }
943                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
944                     it_disposition(it, DISP_OPEN_OPEN) &&
945                     !it_open_error(DISP_OPEN_OPEN, it)) {
946                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
947                         /* balanced in ll_file_open */
948                         ptlrpc_request_addref(request);
949                         /* BUG 11546 - eviction in the middle of open rpc
950                          * processing
951                          */
952                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
953                                          obd_timeout);
954                 }
955
956                 if (it->it_op & IT_CREAT) {
957                         /* XXX this belongs in ll_create_it */
958                 } else if (it->it_op == IT_OPEN) {
959                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
960                 } else {
961                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
962                 }
963         }
964
965         /* If we already have a matching lock, then cancel the new
966          * one.  We have to set the data here instead of in
967          * mdc_enqueue, because we need to use the child's inode as
968          * the l_ast_data to match, and that's not available until
969          * intent_finish has performed the iget().) */
970         lock = ldlm_handle2lock(lockh);
971         if (lock) {
972                 union ldlm_policy_data policy = lock->l_policy_data;
973                 LDLM_DEBUG(lock, "matching against this");
974
975                 if (it_has_reply_body(it)) {
976                         struct mdt_body *body;
977
978                         body = req_capsule_server_get(&request->rq_pill,
979                                                       &RMF_MDT_BODY);
980                         /* mdc_enqueue checked */
981                         LASSERT(body != NULL);
982                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
983                                                  &lock->l_resource->lr_name),
984                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
985                                  PLDLMRES(lock->l_resource),
986                                  PFID(&body->mbo_fid1));
987                 }
988                 LDLM_LOCK_PUT(lock);
989
990                 memcpy(&old_lock, lockh, sizeof(*lockh));
991                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
992                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
993                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
994                         memcpy(lockh, &old_lock, sizeof(old_lock));
995                         it->it_lock_handle = lockh->cookie;
996                 }
997         }
998
999         EXIT;
1000 out:
1001         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1002                 (int)op_data->op_namelen, op_data->op_name,
1003                 ldlm_it2str(it->it_op), it->it_status,
1004                 it->it_disposition, rc);
1005         return rc;
1006 }
1007
1008 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1009                         struct lu_fid *fid, __u64 *bits)
1010 {
1011         /* We could just return 1 immediately, but since we should only
1012          * be called in revalidate_it if we already have a lock, let's
1013          * verify that. */
1014         struct ldlm_res_id res_id;
1015         struct lustre_handle lockh;
1016         union ldlm_policy_data policy;
1017         enum ldlm_mode mode;
1018         ENTRY;
1019
1020         if (it->it_lock_handle) {
1021                 lockh.cookie = it->it_lock_handle;
1022                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1023         } else {
1024                 fid_build_reg_res_name(fid, &res_id);
1025                 switch (it->it_op) {
1026                 case IT_GETATTR:
1027                         /* File attributes are held under multiple bits:
1028                          * nlink is under lookup lock, size and times are
1029                          * under UPDATE lock and recently we've also got
1030                          * a separate permissions lock for owner/group/acl that
1031                          * were protected by lookup lock before.
1032                          * Getattr must provide all of that information,
1033                          * so we need to ensure we have all of those locks.
1034                          * Unfortunately, if the bits are split across multiple
1035                          * locks, there's no easy way to match all of them here,
1036                          * so an extra RPC would be performed to fetch all
1037                          * of those bits at once for now. */
1038                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1039                          * but for old MDTs (< 2.4), permission is covered
1040                          * by LOOKUP lock, so it needs to match all bits here.*/
1041                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1042                                                   MDS_INODELOCK_LOOKUP |
1043                                                   MDS_INODELOCK_PERM;
1044                         break;
1045                 case IT_READDIR:
1046                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1047                         break;
1048                 case IT_LAYOUT:
1049                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1050                         break;
1051                 default:
1052                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1053                         break;
1054                 }
1055
1056                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1057                                       LDLM_IBITS, &policy,
1058                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1059                                       &lockh);
1060         }
1061
1062         if (mode) {
1063                 it->it_lock_handle = lockh.cookie;
1064                 it->it_lock_mode = mode;
1065         } else {
1066                 it->it_lock_handle = 0;
1067                 it->it_lock_mode = 0;
1068         }
1069
1070         RETURN(!!mode);
1071 }
1072
1073 /*
1074  * This long block is all about fixing up the lock and request state
1075  * so that it is correct as of the moment _before_ the operation was
1076  * applied; that way, the VFS will think that everything is normal and
1077  * call Lustre's regular VFS methods.
1078  *
1079  * If we're performing a creation, that means that unless the creation
1080  * failed with EEXIST, we should fake up a negative dentry.
1081  *
1082  * For everything else, we want to lookup to succeed.
1083  *
1084  * One additional note: if CREATE or OPEN succeeded, we add an extra
1085  * reference to the request because we need to keep it around until
1086  * ll_create/ll_open gets called.
1087  *
1088  * The server will return to us, in it_disposition, an indication of
1089  * exactly what it_status refers to.
1090  *
1091  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1092  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1093  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1094  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1095  * was successful.
1096  *
1097  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1098  * child lookup.
1099  */
1100 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1101                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1102                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1103 {
1104         struct ldlm_enqueue_info einfo = {
1105                 .ei_type        = LDLM_IBITS,
1106                 .ei_mode        = it_to_lock_mode(it),
1107                 .ei_cb_bl       = cb_blocking,
1108                 .ei_cb_cp       = ldlm_completion_ast,
1109         };
1110         struct lustre_handle lockh;
1111         int rc = 0;
1112         ENTRY;
1113         LASSERT(it);
1114
1115         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1116                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1117                 op_data->op_name, PFID(&op_data->op_fid2),
1118                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1119                 it->it_flags);
1120
1121         lockh.cookie = 0;
1122         if (fid_is_sane(&op_data->op_fid2) &&
1123             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1124                 /* We could just return 1 immediately, but since we should only
1125                  * be called in revalidate_it if we already have a lock, let's
1126                  * verify that. */
1127                 it->it_lock_handle = 0;
1128                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1129                 /* Only return failure if it was not GETATTR by cfid
1130                    (from inode_revalidate) */
1131                 if (rc || op_data->op_namelen != 0)
1132                         RETURN(rc);
1133         }
1134
1135         /* For case if upper layer did not alloc fid, do it now. */
1136         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1137                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1138                 if (rc < 0) {
1139                         CERROR("Can't alloc new fid, rc %d\n", rc);
1140                         RETURN(rc);
1141                 }
1142         }
1143
1144         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1145                               extra_lock_flags);
1146         if (rc < 0)
1147                 RETURN(rc);
1148
1149         *reqp = it->it_request;
1150         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1151         RETURN(rc);
1152 }
1153
1154 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1155                                               struct ptlrpc_request *req,
1156                                               void *args, int rc)
1157 {
1158         struct mdc_getattr_args  *ga = args;
1159         struct obd_export        *exp = ga->ga_exp;
1160         struct md_enqueue_info   *minfo = ga->ga_minfo;
1161         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1162         struct lookup_intent     *it;
1163         struct lustre_handle     *lockh;
1164         struct obd_device        *obddev;
1165         struct ldlm_reply        *lockrep;
1166         __u64                     flags = LDLM_FL_HAS_INTENT;
1167         ENTRY;
1168
1169         it    = &minfo->mi_it;
1170         lockh = &minfo->mi_lockh;
1171
1172         obddev = class_exp2obd(exp);
1173
1174         obd_put_request_slot(&obddev->u.cli);
1175         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1176                 rc = -ETIMEDOUT;
1177
1178         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1179                                    &flags, NULL, 0, lockh, rc);
1180         if (rc < 0) {
1181                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1182                 mdc_clear_replay_flag(req, rc);
1183                 GOTO(out, rc);
1184         }
1185
1186         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1187         LASSERT(lockrep != NULL);
1188
1189         lockrep->lock_policy_res2 =
1190                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1191
1192         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1193         if (rc)
1194                 GOTO(out, rc);
1195
1196         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1197         EXIT;
1198
1199 out:
1200         minfo->mi_cb(req, minfo, rc);
1201         return 0;
1202 }
1203
1204 int mdc_intent_getattr_async(struct obd_export *exp,
1205                              struct md_enqueue_info *minfo)
1206 {
1207         struct md_op_data       *op_data = &minfo->mi_data;
1208         struct lookup_intent    *it = &minfo->mi_it;
1209         struct ptlrpc_request   *req;
1210         struct mdc_getattr_args *ga;
1211         struct obd_device       *obddev = class_exp2obd(exp);
1212         struct ldlm_res_id       res_id;
1213         union ldlm_policy_data policy = {
1214                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1215                                                  MDS_INODELOCK_UPDATE } };
1216         int                      rc = 0;
1217         __u64                    flags = LDLM_FL_HAS_INTENT;
1218         ENTRY;
1219
1220         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1221                 (int)op_data->op_namelen, op_data->op_name,
1222                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1223
1224         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1225         req = mdc_intent_getattr_pack(exp, it, op_data);
1226         if (IS_ERR(req))
1227                 RETURN(PTR_ERR(req));
1228
1229         rc = obd_get_request_slot(&obddev->u.cli);
1230         if (rc != 0) {
1231                 ptlrpc_req_finished(req);
1232                 RETURN(rc);
1233         }
1234
1235         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1236                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1237         if (rc < 0) {
1238                 obd_put_request_slot(&obddev->u.cli);
1239                 ptlrpc_req_finished(req);
1240                 RETURN(rc);
1241         }
1242
1243         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1244         ga = ptlrpc_req_async_args(req);
1245         ga->ga_exp = exp;
1246         ga->ga_minfo = minfo;
1247
1248         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1249         ptlrpcd_add_req(req);
1250
1251         RETURN(0);
1252 }