Whamcloud - gitweb
LU-2446 build: Update Whamcloud copyright messages for Intel
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         ENTRY;
122
123         if(bits)
124                 *bits = 0;
125
126         if (!*lockh)
127                 RETURN(0);
128
129         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130
131         LASSERT(lock != NULL);
132         lock_res_and_lock(lock);
133 #ifdef __KERNEL__
134         if (lock->l_ast_data && lock->l_ast_data != data) {
135                 struct inode *new_inode = data;
136                 struct inode *old_inode = lock->l_ast_data;
137                 LASSERTF(old_inode->i_state & I_FREEING,
138                          "Found existing inode %p/%lu/%u state %lu in lock: "
139                          "setting data to %p/%lu/%u\n", old_inode,
140                          old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state,
142                          new_inode, new_inode->i_ino, new_inode->i_generation);
143         }
144 #endif
145         lock->l_ast_data = data;
146         if (bits)
147                 *bits = lock->l_policy_data.l_inodebits.bits;
148
149         unlock_res_and_lock(lock);
150         LDLM_LOCK_PUT(lock);
151
152         RETURN(0);
153 }
154
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156                            const struct lu_fid *fid, ldlm_type_t type,
157                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
158                            struct lustre_handle *lockh)
159 {
160         struct ldlm_res_id res_id;
161         ldlm_mode_t rc;
162         ENTRY;
163
164         fid_build_reg_res_name(fid, &res_id);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         RETURN(rc);
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         ENTRY;
182
183         fid_build_reg_res_name(fid, &res_id);
184         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185                                              policy, mode, flags, opaque);
186         RETURN(rc);
187 }
188
189 int mdc_change_cbdata(struct obd_export *exp,
190                       const struct lu_fid *fid,
191                       ldlm_iterator_t it, void *data)
192 {
193         struct ldlm_res_id res_id;
194         ENTRY;
195
196         fid_build_reg_res_name(fid, &res_id);
197         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
198                               &res_id, it, data);
199
200         EXIT;
201         return 0;
202 }
203
204 /* find any ldlm lock of the inode in mdc
205  * return 0    not find
206  *        1    find one
207  *      < 0    error */
208 int mdc_find_cbdata(struct obd_export *exp,
209                     const struct lu_fid *fid,
210                     ldlm_iterator_t it, void *data)
211 {
212         struct ldlm_res_id res_id;
213         int rc = 0;
214         ENTRY;
215
216         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218                                    it, data);
219         if (rc == LDLM_ITER_STOP)
220                 RETURN(1);
221         else if (rc == LDLM_ITER_CONTINUE)
222                 RETURN(0);
223         RETURN(rc);
224 }
225
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 {
228         /* Don't hold error requests for replay. */
229         if (req->rq_replay) {
230                 spin_lock(&req->rq_lock);
231                 req->rq_replay = 0;
232                 spin_unlock(&req->rq_lock);
233         }
234         if (rc && req->rq_transno != 0) {
235                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
236                 LBUG();
237         }
238 }
239
240 /* Save a large LOV EA into the request buffer so that it is available
241  * for replay.  We don't do this in the initial request because the
242  * original request doesn't need this buffer (at most it sends just the
243  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244  * buffer and may also be difficult to allocate and save a very large
245  * request buffer for each open. (bug 5707)
246  *
247  * OOM here may cause recovery failure if lmm is needed (only for the
248  * original open if the MDS crashed just when this client also OOM'd)
249  * but this is incredibly unlikely, and questionable whether the client
250  * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252                                 struct mdt_body *body)
253 {
254         int     rc;
255
256         /* FIXME: remove this explicit offset. */
257         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
258                                         body->eadatasize);
259         if (rc) {
260                 CERROR("Can't enlarge segment %d size to %d\n",
261                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
262                 body->valid &= ~OBD_MD_FLEASIZE;
263                 body->eadatasize = 0;
264         }
265 }
266
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268                                                    struct lookup_intent *it,
269                                                    struct md_op_data *op_data,
270                                                    void *lmm, int lmmsize,
271                                                    void *cb_data)
272 {
273         struct ptlrpc_request *req;
274         struct obd_device     *obddev = class_exp2obd(exp);
275         struct ldlm_intent    *lit;
276         CFS_LIST_HEAD(cancels);
277         int                    count = 0;
278         int                    mode;
279         int                    rc;
280         ENTRY;
281
282         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283
284         /* XXX: openlock is not cancelled for cross-refs. */
285         /* If inode is known, cancel conflicting OPEN locks. */
286         if (fid_is_sane(&op_data->op_fid2)) {
287                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288                         mode = LCK_CW;
289 #ifdef FMODE_EXEC
290                 else if (it->it_flags & FMODE_EXEC)
291                         mode = LCK_PR;
292 #endif
293                 else
294                         mode = LCK_CR;
295                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296                                                 &cancels, mode,
297                                                 MDS_INODELOCK_OPEN);
298         }
299
300         /* If CREATE, cancel parent's UPDATE lock. */
301         if (it->it_op & IT_CREAT)
302                 mode = LCK_EX;
303         else
304                 mode = LCK_CR;
305         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306                                          &cancels, mode,
307                                          MDS_INODELOCK_UPDATE);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310                                    &RQF_LDLM_INTENT_OPEN);
311         if (req == NULL) {
312                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313                 RETURN(ERR_PTR(-ENOMEM));
314         }
315
316         /* parent capability */
317         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318         /* child capability, reserve the size according to parent capa, it will
319          * be filled after we get the reply */
320         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323                              op_data->op_namelen + 1);
324         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326
327         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return NULL;
331         }
332
333         spin_lock(&req->rq_lock);
334         req->rq_replay = req->rq_import->imp_replayable;
335         spin_unlock(&req->rq_lock);
336
337         /* pack the intent */
338         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339         lit->opc = (__u64)it->it_op;
340
341         /* pack the intended request */
342         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
343                       lmmsize);
344
345         /* for remote client, fetch remote perm for current user */
346         if (client_is_remote(exp))
347                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348                                      sizeof(struct mdt_remote_perm));
349         ptlrpc_request_set_replen(req);
350         return req;
351 }
352
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354                                                      struct lookup_intent *it,
355                                                      struct md_op_data *op_data)
356 {
357         struct ptlrpc_request *req;
358         struct obd_device     *obddev = class_exp2obd(exp);
359         struct ldlm_intent    *lit;
360         int                    rc;
361         ENTRY;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364                                    &RQF_LDLM_INTENT_UNLINK);
365         if (req == NULL)
366                 RETURN(ERR_PTR(-ENOMEM));
367
368         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                              op_data->op_namelen + 1);
371
372         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373         if (rc) {
374                 ptlrpc_request_free(req);
375                 RETURN(ERR_PTR(rc));
376         }
377
378         /* pack the intent */
379         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380         lit->opc = (__u64)it->it_op;
381
382         /* pack the intended request */
383         mdc_unlink_pack(req, op_data);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386                              obddev->u.cli.cl_max_mds_easize);
387         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388                              obddev->u.cli.cl_max_mds_cookiesize);
389         ptlrpc_request_set_replen(req);
390         RETURN(req);
391 }
392
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394                                                       struct lookup_intent *it,
395                                                       struct md_op_data *op_data)
396 {
397         struct ptlrpc_request *req;
398         struct obd_device     *obddev = class_exp2obd(exp);
399         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402                                        (client_is_remote(exp) ?
403                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404         struct ldlm_intent    *lit;
405         int                    rc;
406         ENTRY;
407
408         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409                                    &RQF_LDLM_INTENT_GETATTR);
410         if (req == NULL)
411                 RETURN(ERR_PTR(-ENOMEM));
412
413         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415                              op_data->op_namelen + 1);
416
417         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(ERR_PTR(rc));
421         }
422
423         /* pack the intent */
424         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425         lit->opc = (__u64)it->it_op;
426
427         /* pack the intended request */
428         mdc_getattr_pack(req, valid, it->it_flags, op_data,
429                          obddev->u.cli.cl_max_mds_easize);
430
431         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432                              obddev->u.cli.cl_max_mds_easize);
433         if (client_is_remote(exp))
434                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435                                      sizeof(struct mdt_remote_perm));
436         ptlrpc_request_set_replen(req);
437         RETURN(req);
438 }
439
440 static struct ptlrpc_request *
441 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
442 {
443         struct ptlrpc_request *req;
444         int rc;
445         ENTRY;
446
447         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
448         if (req == NULL)
449                 RETURN(ERR_PTR(-ENOMEM));
450
451         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 RETURN(ERR_PTR(rc));
455         }
456
457         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
458         ptlrpc_request_set_replen(req);
459         RETURN(req);
460 }
461
462 static int mdc_finish_enqueue(struct obd_export *exp,
463                               struct ptlrpc_request *req,
464                               struct ldlm_enqueue_info *einfo,
465                               struct lookup_intent *it,
466                               struct lustre_handle *lockh,
467                               int rc)
468 {
469         struct req_capsule  *pill = &req->rq_pill;
470         struct ldlm_request *lockreq;
471         struct ldlm_reply   *lockrep;
472         struct lustre_intent_data *intent = &it->d.lustre;
473         ENTRY;
474
475         LASSERT(rc >= 0);
476         /* Similarly, if we're going to replay this request, we don't want to
477          * actually get a lock, just perform the intent. */
478         if (req->rq_transno || req->rq_replay) {
479                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
480                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
481         }
482
483         if (rc == ELDLM_LOCK_ABORTED) {
484                 einfo->ei_mode = 0;
485                 memset(lockh, 0, sizeof(*lockh));
486                 rc = 0;
487         } else { /* rc = 0 */
488                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
489                 LASSERT(lock);
490
491                 /* If the server gave us back a different lock mode, we should
492                  * fix up our variables. */
493                 if (lock->l_req_mode != einfo->ei_mode) {
494                         ldlm_lock_addref(lockh, lock->l_req_mode);
495                         ldlm_lock_decref(lockh, einfo->ei_mode);
496                         einfo->ei_mode = lock->l_req_mode;
497                 }
498                 LDLM_LOCK_PUT(lock);
499         }
500
501         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
502         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
503
504         intent->it_disposition = (int)lockrep->lock_policy_res1;
505         intent->it_status = (int)lockrep->lock_policy_res2;
506         intent->it_lock_mode = einfo->ei_mode;
507         intent->it_lock_handle = lockh->cookie;
508         intent->it_data = req;
509
510         /* Technically speaking rq_transno must already be zero if
511          * it_status is in error, so the check is a bit redundant */
512         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
513                 mdc_clear_replay_flag(req, intent->it_status);
514
515         /* If we're doing an IT_OPEN which did not result in an actual
516          * successful open, then we need to remove the bit which saves
517          * this request for unconditional replay.
518          *
519          * It's important that we do this first!  Otherwise we might exit the
520          * function without doing so, and try to replay a failed create
521          * (bug 3440) */
522         if (it->it_op & IT_OPEN && req->rq_replay &&
523             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
524                 mdc_clear_replay_flag(req, intent->it_status);
525
526         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527                   it->it_op, intent->it_disposition, intent->it_status);
528
529         /* We know what to expect, so we do any byte flipping required here */
530         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531                 struct mdt_body *body;
532
533                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
534                 if (body == NULL) {
535                         CERROR ("Can't swab mdt_body\n");
536                         RETURN (-EPROTO);
537                 }
538
539                 if (it_disposition(it, DISP_OPEN_OPEN) &&
540                     !it_open_error(DISP_OPEN_OPEN, it)) {
541                         /*
542                          * If this is a successful OPEN request, we need to set
543                          * replay handler and data early, so that if replay
544                          * happens immediately after swabbing below, new reply
545                          * is swabbed by that handler correctly.
546                          */
547                         mdc_set_open_replay_data(NULL, NULL, req);
548                 }
549
550                 /* TODO: make sure LAYOUT lock must be granted along with EA */
551
552                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553                         void *eadata;
554
555                          mdc_update_max_ea_from_body(exp, body);
556
557                         /*
558                          * The eadata is opaque; just check that it is there.
559                          * Eventually, obd_unpackmd() will check the contents.
560                          */
561                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
562                                                               body->eadatasize);
563                         if (eadata == NULL)
564                                 RETURN(-EPROTO);
565
566                         /*
567                          * We save the reply LOV EA in case we have to replay a
568                          * create for recovery.  If we didn't allocate a large
569                          * enough request buffer above we need to reallocate it
570                          * here to hold the actual LOV EA.
571                          *
572                          * To not save LOV EA if request is not going to replay
573                          * (for example error one).
574                          */
575                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
576                                 void *lmm;
577                                 if (req_capsule_get_size(pill, &RMF_EADATA,
578                                                          RCL_CLIENT) <
579                                     body->eadatasize)
580                                         mdc_realloc_openmsg(req, body);
581                                 else
582                                         req_capsule_shrink(pill, &RMF_EADATA,
583                                                            body->eadatasize,
584                                                            RCL_CLIENT);
585
586                                 req_capsule_set_size(pill, &RMF_EADATA,
587                                                      RCL_CLIENT,
588                                                      body->eadatasize);
589
590                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
591                                 if (lmm)
592                                         memcpy(lmm, eadata, body->eadatasize);
593                         }
594                 }
595
596                 if (body->valid & OBD_MD_FLRMTPERM) {
597                         struct mdt_remote_perm *perm;
598
599                         LASSERT(client_is_remote(exp));
600                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
601                                                 lustre_swab_mdt_remote_perm);
602                         if (perm == NULL)
603                                 RETURN(-EPROTO);
604                 }
605                 if (body->valid & OBD_MD_FLMDSCAPA) {
606                         struct lustre_capa *capa, *p;
607
608                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
609                         if (capa == NULL)
610                                 RETURN(-EPROTO);
611
612                         if (it->it_op & IT_OPEN) {
613                                 /* client fid capa will be checked in replay */
614                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
615                                 LASSERT(p);
616                                 *p = *capa;
617                         }
618                 }
619                 if (body->valid & OBD_MD_FLOSSCAPA) {
620                         struct lustre_capa *capa;
621
622                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
623                         if (capa == NULL)
624                                 RETURN(-EPROTO);
625                 }
626         } else if (it->it_op & IT_LAYOUT) {
627                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
628
629                 if (lock != NULL && lock->l_lvb_data == NULL) {
630                         int lvb_len;
631
632                         /* maybe the lock was granted right away and layout
633                          * is packed into RMF_DLM_LVB of req */
634                         lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
635                                                        RCL_SERVER);
636                         if (lvb_len > 0) {
637                                 void *lvb;
638                                 void *lmm;
639
640                                 lvb = req_capsule_server_sized_get(pill,
641                                                         &RMF_DLM_LVB, lvb_len);
642                                 if (lvb == NULL) {
643                                         LDLM_LOCK_PUT(lock);
644                                         RETURN(-EPROTO);
645                                 }
646
647                                 OBD_ALLOC_LARGE(lmm, lvb_len);
648                                 if (lmm == NULL) {
649                                         LDLM_LOCK_PUT(lock);
650                                         RETURN(-ENOMEM);
651                                 }
652                                 memcpy(lmm, lvb, lvb_len);
653
654                                 /* install lvb_data */
655                                 lock_res_and_lock(lock);
656                                 LASSERT(lock->l_lvb_data == NULL);
657                                 lock->l_lvb_data = lmm;
658                                 lock->l_lvb_len = lvb_len;
659                                 unlock_res_and_lock(lock);
660                         }
661                 }
662                 if (lock != NULL)
663                         LDLM_LOCK_PUT(lock);
664         }
665
666         RETURN(rc);
667 }
668
669 /* We always reserve enough space in the reply packet for a stripe MD, because
670  * we don't know in advance the file type. */
671 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
672                 struct lookup_intent *it, struct md_op_data *op_data,
673                 struct lustre_handle *lockh, void *lmm, int lmmsize,
674                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
675 {
676         struct obd_device     *obddev = class_exp2obd(exp);
677         struct ptlrpc_request *req = NULL;
678         __u64                  flags, saved_flags = extra_lock_flags;
679         int                    rc;
680         struct ldlm_res_id res_id;
681         static const ldlm_policy_data_t lookup_policy =
682                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
683         static const ldlm_policy_data_t update_policy =
684                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
685         static const ldlm_policy_data_t layout_policy =
686                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
687         ldlm_policy_data_t const *policy = &lookup_policy;
688         int                    generation, resends = 0;
689         struct ldlm_reply     *lockrep;
690         enum lvb_type          lvb_type = 0;
691         ENTRY;
692
693         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
694                  einfo->ei_type);
695
696         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
697
698         if (it) {
699                 saved_flags |= LDLM_FL_HAS_INTENT;
700                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
701                         policy = &update_policy;
702                 else if (it->it_op & IT_LAYOUT)
703                         policy = &layout_policy;
704         }
705
706         LASSERT(reqp == NULL);
707
708         generation = obddev->u.cli.cl_import->imp_generation;
709 resend:
710         flags = saved_flags;
711         if (!it) {
712                 /* The only way right now is FLOCK, in this case we hide flock
713                    policy as lmm, but lmmsize is 0 */
714                 LASSERT(lmm && lmmsize == 0);
715                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
716                          einfo->ei_type);
717                 policy = (ldlm_policy_data_t *)lmm;
718                 res_id.name[3] = LDLM_FLOCK;
719         } else if (it->it_op & IT_OPEN) {
720                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
721                                            einfo->ei_cbdata);
722                 policy = &update_policy;
723                 einfo->ei_cbdata = NULL;
724                 lmm = NULL;
725         } else if (it->it_op & IT_UNLINK) {
726                 req = mdc_intent_unlink_pack(exp, it, op_data);
727         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
728                 req = mdc_intent_getattr_pack(exp, it, op_data);
729         } else if (it->it_op & IT_READDIR) {
730                 req = mdc_enqueue_pack(exp, 0);
731         } else if (it->it_op & IT_LAYOUT) {
732                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
733                         RETURN(-EOPNOTSUPP);
734
735                 req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
736                 lvb_type = LVB_T_LAYOUT;
737         } else {
738                 LBUG();
739                 RETURN(-EINVAL);
740         }
741
742         if (IS_ERR(req))
743                 RETURN(PTR_ERR(req));
744
745         if (req != NULL && it && it->it_op & IT_CREAT)
746                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
747                  * retry logic */
748                 req->rq_no_retry_einprogress = 1;
749
750         if (resends) {
751                 req->rq_generation_set = 1;
752                 req->rq_import_generation = generation;
753                 req->rq_sent = cfs_time_current_sec() + resends;
754         }
755
756         /* It is important to obtain rpc_lock first (if applicable), so that
757          * threads that are serialised with rpc_lock are not polluting our
758          * rpcs in flight counter. We do not do flock request limiting, though*/
759         if (it) {
760                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
761                 rc = mdc_enter_request(&obddev->u.cli);
762                 if (rc != 0) {
763                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
764                         mdc_clear_replay_flag(req, 0);
765                         ptlrpc_req_finished(req);
766                         RETURN(rc);
767                 }
768         }
769
770         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
771                               0, lvb_type, lockh, 0);
772         if (!it) {
773                 /* For flock requests we immediatelly return without further
774                    delay and let caller deal with the rest, since rest of
775                    this function metadata processing makes no sense for flock
776                    requests anyway */
777                 RETURN(rc);
778         }
779
780         mdc_exit_request(&obddev->u.cli);
781         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
782
783         if (rc < 0) {
784                 CERROR("ldlm_cli_enqueue: %d\n", rc);
785                 mdc_clear_replay_flag(req, rc);
786                 ptlrpc_req_finished(req);
787                 RETURN(rc);
788         }
789
790         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
791         LASSERT(lockrep != NULL);
792
793         /* Retry the create infinitely when we get -EINPROGRESS from
794          * server. This is required by the new quota design. */
795         if (it && it->it_op & IT_CREAT &&
796             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
797                 mdc_clear_replay_flag(req, rc);
798                 ptlrpc_req_finished(req);
799                 resends++;
800
801                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
802                        obddev->obd_name, resends, it->it_op,
803                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
804
805                 if (generation == obddev->u.cli.cl_import->imp_generation) {
806                         goto resend;
807                 } else {
808                         CDEBUG(D_HA, "resend cross eviction\n");
809                         RETURN(-EIO);
810                 }
811         }
812
813         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
814
815         RETURN(rc);
816 }
817
818 static int mdc_finish_intent_lock(struct obd_export *exp,
819                                   struct ptlrpc_request *request,
820                                   struct md_op_data *op_data,
821                                   struct lookup_intent *it,
822                                   struct lustre_handle *lockh)
823 {
824         struct lustre_handle old_lock;
825         struct mdt_body *mdt_body;
826         struct ldlm_lock *lock;
827         int rc;
828
829
830         LASSERT(request != NULL);
831         LASSERT(request != LP_POISON);
832         LASSERT(request->rq_repmsg != LP_POISON);
833
834         if (!it_disposition(it, DISP_IT_EXECD)) {
835                 /* The server failed before it even started executing the
836                  * intent, i.e. because it couldn't unpack the request. */
837                 LASSERT(it->d.lustre.it_status != 0);
838                 RETURN(it->d.lustre.it_status);
839         }
840         rc = it_open_error(DISP_IT_EXECD, it);
841         if (rc)
842                 RETURN(rc);
843
844         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
845         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
846
847         /* If we were revalidating a fid/name pair, mark the intent in
848          * case we fail and get called again from lookup */
849         if (fid_is_sane(&op_data->op_fid2) &&
850             it->it_create_mode & M_CHECK_STALE &&
851             it->it_op != IT_GETATTR) {
852                 it_set_disposition(it, DISP_ENQ_COMPLETE);
853
854                 /* Also: did we find the same inode? */
855                 /* sever can return one of two fids:
856                  * op_fid2 - new allocated fid - if file is created.
857                  * op_fid3 - existent fid - if file only open.
858                  * op_fid3 is saved in lmv_intent_open */
859                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
860                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
861                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
862                                "\n", PFID(&op_data->op_fid2),
863                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
864                         RETURN(-ESTALE);
865                 }
866         }
867
868         rc = it_open_error(DISP_LOOKUP_EXECD, it);
869         if (rc)
870                 RETURN(rc);
871
872         /* keep requests around for the multiple phases of the call
873          * this shows the DISP_XX must guarantee we make it into the call
874          */
875         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
876             it_disposition(it, DISP_OPEN_CREATE) &&
877             !it_open_error(DISP_OPEN_CREATE, it)) {
878                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
879                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
880         }
881         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
882             it_disposition(it, DISP_OPEN_OPEN) &&
883             !it_open_error(DISP_OPEN_OPEN, it)) {
884                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
885                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
886                 /* BUG 11546 - eviction in the middle of open rpc processing */
887                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
888         }
889
890         if (it->it_op & IT_CREAT) {
891                 /* XXX this belongs in ll_create_it */
892         } else if (it->it_op == IT_OPEN) {
893                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
894         } else {
895                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
896         }
897
898         /* If we already have a matching lock, then cancel the new
899          * one.  We have to set the data here instead of in
900          * mdc_enqueue, because we need to use the child's inode as
901          * the l_ast_data to match, and that's not available until
902          * intent_finish has performed the iget().) */
903         lock = ldlm_handle2lock(lockh);
904         if (lock) {
905                 ldlm_policy_data_t policy = lock->l_policy_data;
906                 LDLM_DEBUG(lock, "matching against this");
907
908                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
909                                          &lock->l_resource->lr_name),
910                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
911                          (unsigned long)lock->l_resource->lr_name.name[0],
912                          (unsigned long)lock->l_resource->lr_name.name[1],
913                          (unsigned long)lock->l_resource->lr_name.name[2],
914                          (unsigned long)fid_seq(&mdt_body->fid1),
915                          (unsigned long)fid_oid(&mdt_body->fid1),
916                          (unsigned long)fid_ver(&mdt_body->fid1));
917                 LDLM_LOCK_PUT(lock);
918
919                 memcpy(&old_lock, lockh, sizeof(*lockh));
920                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
921                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
922                         ldlm_lock_decref_and_cancel(lockh,
923                                                     it->d.lustre.it_lock_mode);
924                         memcpy(lockh, &old_lock, sizeof(old_lock));
925                         it->d.lustre.it_lock_handle = lockh->cookie;
926                 }
927         }
928         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
929                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
930                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
931         RETURN(rc);
932 }
933
934 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
935                         struct lu_fid *fid, __u64 *bits)
936 {
937         /* We could just return 1 immediately, but since we should only
938          * be called in revalidate_it if we already have a lock, let's
939          * verify that. */
940         struct ldlm_res_id res_id;
941         struct lustre_handle lockh;
942         ldlm_policy_data_t policy;
943         ldlm_mode_t mode;
944         ENTRY;
945
946         if (it->d.lustre.it_lock_handle) {
947                 lockh.cookie = it->d.lustre.it_lock_handle;
948                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
949         } else {
950                 fid_build_reg_res_name(fid, &res_id);
951                 switch (it->it_op) {
952                 case IT_GETATTR:
953                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
954                         break;
955                 case IT_LAYOUT:
956                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
957                         break;
958                 default:
959                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
960                         break;
961                 }
962                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
963                                        LDLM_FL_BLOCK_GRANTED, &res_id,
964                                        LDLM_IBITS, &policy,
965                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
966         }
967
968         if (mode) {
969                 it->d.lustre.it_lock_handle = lockh.cookie;
970                 it->d.lustre.it_lock_mode = mode;
971         } else {
972                 it->d.lustre.it_lock_handle = 0;
973                 it->d.lustre.it_lock_mode = 0;
974         }
975
976         RETURN(!!mode);
977 }
978
979 /*
980  * This long block is all about fixing up the lock and request state
981  * so that it is correct as of the moment _before_ the operation was
982  * applied; that way, the VFS will think that everything is normal and
983  * call Lustre's regular VFS methods.
984  *
985  * If we're performing a creation, that means that unless the creation
986  * failed with EEXIST, we should fake up a negative dentry.
987  *
988  * For everything else, we want to lookup to succeed.
989  *
990  * One additional note: if CREATE or OPEN succeeded, we add an extra
991  * reference to the request because we need to keep it around until
992  * ll_create/ll_open gets called.
993  *
994  * The server will return to us, in it_disposition, an indication of
995  * exactly what d.lustre.it_status refers to.
996  *
997  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
998  * otherwise if DISP_OPEN_CREATE is set, then it status is the
999  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1000  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1001  * was successful.
1002  *
1003  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1004  * child lookup.
1005  */
1006 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1007                     void *lmm, int lmmsize, struct lookup_intent *it,
1008                     int lookup_flags, struct ptlrpc_request **reqp,
1009                     ldlm_blocking_callback cb_blocking,
1010                     __u64 extra_lock_flags)
1011 {
1012         struct lustre_handle lockh;
1013         int rc = 0;
1014         ENTRY;
1015         LASSERT(it);
1016
1017         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1018                ", intent: %s flags %#o\n", op_data->op_namelen,
1019                op_data->op_name, PFID(&op_data->op_fid2),
1020                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1021                it->it_flags);
1022
1023         lockh.cookie = 0;
1024         if (fid_is_sane(&op_data->op_fid2) &&
1025             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1026                 /* We could just return 1 immediately, but since we should only
1027                  * be called in revalidate_it if we already have a lock, let's
1028                  * verify that. */
1029                 it->d.lustre.it_lock_handle = 0;
1030                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1031                 /* Only return failure if it was not GETATTR by cfid
1032                    (from inode_revalidate) */
1033                 if (rc || op_data->op_namelen != 0)
1034                         RETURN(rc);
1035         }
1036
1037         /* lookup_it may be called only after revalidate_it has run, because
1038          * revalidate_it cannot return errors, only zero.  Returning zero causes
1039          * this call to lookup, which *can* return an error.
1040          *
1041          * We only want to execute the request associated with the intent one
1042          * time, however, so don't send the request again.  Instead, skip past
1043          * this and use the request from revalidate.  In this case, revalidate
1044          * never dropped its reference, so the refcounts are all OK */
1045         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1046                 struct ldlm_enqueue_info einfo =
1047                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1048                           ldlm_completion_ast, NULL, NULL, NULL };
1049
1050                 /* For case if upper layer did not alloc fid, do it now. */
1051                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1052                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1053                         if (rc < 0) {
1054                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1055                                 RETURN(rc);
1056                         }
1057                 }
1058                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1059                                  lmm, lmmsize, NULL, extra_lock_flags);
1060                 if (rc < 0)
1061                         RETURN(rc);
1062         } else if (!fid_is_sane(&op_data->op_fid2) ||
1063                    !(it->it_create_mode & M_CHECK_STALE)) {
1064                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1065                  * request referenced from this intent, saved for subsequent
1066                  * lookup.  This path is executed when we proceed to this
1067                  * lookup, so we clear DISP_ENQ_COMPLETE */
1068                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1069         }
1070         *reqp = it->d.lustre.it_data;
1071         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1072         RETURN(rc);
1073 }
1074
1075 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1076                                               struct ptlrpc_request *req,
1077                                               void *args, int rc)
1078 {
1079         struct mdc_getattr_args  *ga = args;
1080         struct obd_export        *exp = ga->ga_exp;
1081         struct md_enqueue_info   *minfo = ga->ga_minfo;
1082         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1083         struct lookup_intent     *it;
1084         struct lustre_handle     *lockh;
1085         struct obd_device        *obddev;
1086         __u64                     flags = LDLM_FL_HAS_INTENT;
1087         ENTRY;
1088
1089         it    = &minfo->mi_it;
1090         lockh = &minfo->mi_lockh;
1091
1092         obddev = class_exp2obd(exp);
1093
1094         mdc_exit_request(&obddev->u.cli);
1095         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1096                 rc = -ETIMEDOUT;
1097
1098         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1099                                    &flags, NULL, 0, lockh, rc);
1100         if (rc < 0) {
1101                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1102                 mdc_clear_replay_flag(req, rc);
1103                 GOTO(out, rc);
1104         }
1105
1106         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1107         if (rc)
1108                 GOTO(out, rc);
1109
1110         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1111         EXIT;
1112
1113 out:
1114         OBD_FREE_PTR(einfo);
1115         minfo->mi_cb(req, minfo, rc);
1116         return 0;
1117 }
1118
1119 int mdc_intent_getattr_async(struct obd_export *exp,
1120                              struct md_enqueue_info *minfo,
1121                              struct ldlm_enqueue_info *einfo)
1122 {
1123         struct md_op_data       *op_data = &minfo->mi_data;
1124         struct lookup_intent    *it = &minfo->mi_it;
1125         struct ptlrpc_request   *req;
1126         struct mdc_getattr_args *ga;
1127         struct obd_device       *obddev = class_exp2obd(exp);
1128         struct ldlm_res_id       res_id;
1129         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1130          *     for statahead currently. Consider CMD in future, such two bits
1131          *     maybe managed by different MDS, should be adjusted then. */
1132         ldlm_policy_data_t       policy = {
1133                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1134                                                          MDS_INODELOCK_UPDATE }
1135                                  };
1136         int                      rc = 0;
1137         __u64                    flags = LDLM_FL_HAS_INTENT;
1138         ENTRY;
1139
1140         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1141                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1142                ldlm_it2str(it->it_op), it->it_flags);
1143
1144         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1145         req = mdc_intent_getattr_pack(exp, it, op_data);
1146         if (!req)
1147                 RETURN(-ENOMEM);
1148
1149         rc = mdc_enter_request(&obddev->u.cli);
1150         if (rc != 0) {
1151                 ptlrpc_req_finished(req);
1152                 RETURN(rc);
1153         }
1154
1155         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1156                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1157         if (rc < 0) {
1158                 mdc_exit_request(&obddev->u.cli);
1159                 ptlrpc_req_finished(req);
1160                 RETURN(rc);
1161         }
1162
1163         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1164         ga = ptlrpc_req_async_args(req);
1165         ga->ga_exp = exp;
1166         ga->ga_minfo = minfo;
1167         ga->ga_einfo = einfo;
1168
1169         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1170         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1171
1172         RETURN(0);
1173 }