Whamcloud - gitweb
LU-1876 hsm: layout lock implementation on server side
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         ENTRY;
122
123         if(bits)
124                 *bits = 0;
125
126         if (!*lockh)
127                 RETURN(0);
128
129         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130
131         LASSERT(lock != NULL);
132         lock_res_and_lock(lock);
133 #ifdef __KERNEL__
134         if (lock->l_ast_data && lock->l_ast_data != data) {
135                 struct inode *new_inode = data;
136                 struct inode *old_inode = lock->l_ast_data;
137                 LASSERTF(old_inode->i_state & I_FREEING,
138                          "Found existing inode %p/%lu/%u state %lu in lock: "
139                          "setting data to %p/%lu/%u\n", old_inode,
140                          old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state,
142                          new_inode, new_inode->i_ino, new_inode->i_generation);
143         }
144 #endif
145         lock->l_ast_data = data;
146         if (bits)
147                 *bits = lock->l_policy_data.l_inodebits.bits;
148
149         unlock_res_and_lock(lock);
150         LDLM_LOCK_PUT(lock);
151
152         RETURN(0);
153 }
154
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156                            const struct lu_fid *fid, ldlm_type_t type,
157                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
158                            struct lustre_handle *lockh)
159 {
160         struct ldlm_res_id res_id;
161         ldlm_mode_t rc;
162         ENTRY;
163
164         fid_build_reg_res_name(fid, &res_id);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         RETURN(rc);
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         ENTRY;
182
183         fid_build_reg_res_name(fid, &res_id);
184         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185                                              policy, mode, flags, opaque);
186         RETURN(rc);
187 }
188
189 int mdc_change_cbdata(struct obd_export *exp,
190                       const struct lu_fid *fid,
191                       ldlm_iterator_t it, void *data)
192 {
193         struct ldlm_res_id res_id;
194         ENTRY;
195
196         fid_build_reg_res_name(fid, &res_id);
197         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
198                               &res_id, it, data);
199
200         EXIT;
201         return 0;
202 }
203
204 /* find any ldlm lock of the inode in mdc
205  * return 0    not find
206  *        1    find one
207  *      < 0    error */
208 int mdc_find_cbdata(struct obd_export *exp,
209                     const struct lu_fid *fid,
210                     ldlm_iterator_t it, void *data)
211 {
212         struct ldlm_res_id res_id;
213         int rc = 0;
214         ENTRY;
215
216         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218                                    it, data);
219         if (rc == LDLM_ITER_STOP)
220                 RETURN(1);
221         else if (rc == LDLM_ITER_CONTINUE)
222                 RETURN(0);
223         RETURN(rc);
224 }
225
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 {
228         /* Don't hold error requests for replay. */
229         if (req->rq_replay) {
230                 spin_lock(&req->rq_lock);
231                 req->rq_replay = 0;
232                 spin_unlock(&req->rq_lock);
233         }
234         if (rc && req->rq_transno != 0) {
235                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
236                 LBUG();
237         }
238 }
239
240 /* Save a large LOV EA into the request buffer so that it is available
241  * for replay.  We don't do this in the initial request because the
242  * original request doesn't need this buffer (at most it sends just the
243  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244  * buffer and may also be difficult to allocate and save a very large
245  * request buffer for each open. (bug 5707)
246  *
247  * OOM here may cause recovery failure if lmm is needed (only for the
248  * original open if the MDS crashed just when this client also OOM'd)
249  * but this is incredibly unlikely, and questionable whether the client
250  * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252                                 struct mdt_body *body)
253 {
254         int     rc;
255
256         /* FIXME: remove this explicit offset. */
257         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
258                                         body->eadatasize);
259         if (rc) {
260                 CERROR("Can't enlarge segment %d size to %d\n",
261                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
262                 body->valid &= ~OBD_MD_FLEASIZE;
263                 body->eadatasize = 0;
264         }
265 }
266
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268                                                    struct lookup_intent *it,
269                                                    struct md_op_data *op_data,
270                                                    void *lmm, int lmmsize,
271                                                    void *cb_data)
272 {
273         struct ptlrpc_request *req;
274         struct obd_device     *obddev = class_exp2obd(exp);
275         struct ldlm_intent    *lit;
276         CFS_LIST_HEAD(cancels);
277         int                    count = 0;
278         int                    mode;
279         int                    rc;
280         ENTRY;
281
282         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283
284         /* XXX: openlock is not cancelled for cross-refs. */
285         /* If inode is known, cancel conflicting OPEN locks. */
286         if (fid_is_sane(&op_data->op_fid2)) {
287                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288                         mode = LCK_CW;
289 #ifdef FMODE_EXEC
290                 else if (it->it_flags & FMODE_EXEC)
291                         mode = LCK_PR;
292 #endif
293                 else
294                         mode = LCK_CR;
295                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296                                                 &cancels, mode,
297                                                 MDS_INODELOCK_OPEN);
298         }
299
300         /* If CREATE, cancel parent's UPDATE lock. */
301         if (it->it_op & IT_CREAT)
302                 mode = LCK_EX;
303         else
304                 mode = LCK_CR;
305         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306                                          &cancels, mode,
307                                          MDS_INODELOCK_UPDATE);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310                                    &RQF_LDLM_INTENT_OPEN);
311         if (req == NULL) {
312                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313                 RETURN(ERR_PTR(-ENOMEM));
314         }
315
316         /* parent capability */
317         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318         /* child capability, reserve the size according to parent capa, it will
319          * be filled after we get the reply */
320         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323                              op_data->op_namelen + 1);
324         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326
327         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return NULL;
331         }
332
333         spin_lock(&req->rq_lock);
334         req->rq_replay = req->rq_import->imp_replayable;
335         spin_unlock(&req->rq_lock);
336
337         /* pack the intent */
338         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339         lit->opc = (__u64)it->it_op;
340
341         /* pack the intended request */
342         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
343                       lmmsize);
344
345         /* for remote client, fetch remote perm for current user */
346         if (client_is_remote(exp))
347                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348                                      sizeof(struct mdt_remote_perm));
349         ptlrpc_request_set_replen(req);
350         return req;
351 }
352
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354                                                      struct lookup_intent *it,
355                                                      struct md_op_data *op_data)
356 {
357         struct ptlrpc_request *req;
358         struct obd_device     *obddev = class_exp2obd(exp);
359         struct ldlm_intent    *lit;
360         int                    rc;
361         ENTRY;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364                                    &RQF_LDLM_INTENT_UNLINK);
365         if (req == NULL)
366                 RETURN(ERR_PTR(-ENOMEM));
367
368         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                              op_data->op_namelen + 1);
371
372         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373         if (rc) {
374                 ptlrpc_request_free(req);
375                 RETURN(ERR_PTR(rc));
376         }
377
378         /* pack the intent */
379         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380         lit->opc = (__u64)it->it_op;
381
382         /* pack the intended request */
383         mdc_unlink_pack(req, op_data);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386                              obddev->u.cli.cl_max_mds_easize);
387         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388                              obddev->u.cli.cl_max_mds_cookiesize);
389         ptlrpc_request_set_replen(req);
390         RETURN(req);
391 }
392
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394                                                       struct lookup_intent *it,
395                                                       struct md_op_data *op_data)
396 {
397         struct ptlrpc_request *req;
398         struct obd_device     *obddev = class_exp2obd(exp);
399         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402                                        (client_is_remote(exp) ?
403                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404         struct ldlm_intent    *lit;
405         int                    rc;
406         ENTRY;
407
408         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409                                    &RQF_LDLM_INTENT_GETATTR);
410         if (req == NULL)
411                 RETURN(ERR_PTR(-ENOMEM));
412
413         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415                              op_data->op_namelen + 1);
416
417         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(ERR_PTR(rc));
421         }
422
423         /* pack the intent */
424         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425         lit->opc = (__u64)it->it_op;
426
427         /* pack the intended request */
428         mdc_getattr_pack(req, valid, it->it_flags, op_data,
429                          obddev->u.cli.cl_max_mds_easize);
430
431         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432                              obddev->u.cli.cl_max_mds_easize);
433         if (client_is_remote(exp))
434                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435                                      sizeof(struct mdt_remote_perm));
436         ptlrpc_request_set_replen(req);
437         RETURN(req);
438 }
439
440 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
441                                                      struct lookup_intent *it,
442                                                      struct md_op_data *unused)
443 {
444         struct obd_device     *obd = class_exp2obd(exp);
445         struct ptlrpc_request *req;
446         struct ldlm_intent    *lit;
447         struct layout_intent  *layout;
448         int rc;
449         ENTRY;
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
452                                 &RQF_LDLM_INTENT_LAYOUT);
453         if (req == NULL)
454                 RETURN(ERR_PTR(-ENOMEM));
455
456         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
457         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
458         if (rc) {
459                 ptlrpc_request_free(req);
460                 RETURN(ERR_PTR(rc));
461         }
462
463         /* pack the intent */
464         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
465         lit->opc = (__u64)it->it_op;
466
467         /* pack the layout intent request */
468         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
469         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
470          * set for replication */
471         layout->li_opc = LAYOUT_INTENT_ACCESS;
472
473         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
474                         obd->u.cli.cl_max_mds_easize);
475         ptlrpc_request_set_replen(req);
476         RETURN(req);
477 }
478
479 static struct ptlrpc_request *
480 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
481 {
482         struct ptlrpc_request *req;
483         int rc;
484         ENTRY;
485
486         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
487         if (req == NULL)
488                 RETURN(ERR_PTR(-ENOMEM));
489
490         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
491         if (rc) {
492                 ptlrpc_request_free(req);
493                 RETURN(ERR_PTR(rc));
494         }
495
496         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
497         ptlrpc_request_set_replen(req);
498         RETURN(req);
499 }
500
501 static int mdc_finish_enqueue(struct obd_export *exp,
502                               struct ptlrpc_request *req,
503                               struct ldlm_enqueue_info *einfo,
504                               struct lookup_intent *it,
505                               struct lustre_handle *lockh,
506                               int rc)
507 {
508         struct req_capsule  *pill = &req->rq_pill;
509         struct ldlm_request *lockreq;
510         struct ldlm_reply   *lockrep;
511         struct lustre_intent_data *intent = &it->d.lustre;
512         struct ldlm_lock    *lock;
513         void                *lvb_data = NULL;
514         int                  lvb_len = 0;
515         ENTRY;
516
517         LASSERT(rc >= 0);
518         /* Similarly, if we're going to replay this request, we don't want to
519          * actually get a lock, just perform the intent. */
520         if (req->rq_transno || req->rq_replay) {
521                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
522                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
523         }
524
525         if (rc == ELDLM_LOCK_ABORTED) {
526                 einfo->ei_mode = 0;
527                 memset(lockh, 0, sizeof(*lockh));
528                 rc = 0;
529         } else { /* rc = 0 */
530                 lock = ldlm_handle2lock(lockh);
531                 LASSERT(lock != NULL);
532
533                 /* If the server gave us back a different lock mode, we should
534                  * fix up our variables. */
535                 if (lock->l_req_mode != einfo->ei_mode) {
536                         ldlm_lock_addref(lockh, lock->l_req_mode);
537                         ldlm_lock_decref(lockh, einfo->ei_mode);
538                         einfo->ei_mode = lock->l_req_mode;
539                 }
540                 LDLM_LOCK_PUT(lock);
541         }
542
543         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
544         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
545
546         intent->it_disposition = (int)lockrep->lock_policy_res1;
547         intent->it_status = (int)lockrep->lock_policy_res2;
548         intent->it_lock_mode = einfo->ei_mode;
549         intent->it_lock_handle = lockh->cookie;
550         intent->it_data = req;
551
552         /* Technically speaking rq_transno must already be zero if
553          * it_status is in error, so the check is a bit redundant */
554         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
555                 mdc_clear_replay_flag(req, intent->it_status);
556
557         /* If we're doing an IT_OPEN which did not result in an actual
558          * successful open, then we need to remove the bit which saves
559          * this request for unconditional replay.
560          *
561          * It's important that we do this first!  Otherwise we might exit the
562          * function without doing so, and try to replay a failed create
563          * (bug 3440) */
564         if (it->it_op & IT_OPEN && req->rq_replay &&
565             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
566                 mdc_clear_replay_flag(req, intent->it_status);
567
568         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
569                   it->it_op, intent->it_disposition, intent->it_status);
570
571         /* We know what to expect, so we do any byte flipping required here */
572         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
573                 struct mdt_body *body;
574
575                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
576                 if (body == NULL) {
577                         CERROR ("Can't swab mdt_body\n");
578                         RETURN (-EPROTO);
579                 }
580
581                 if (it_disposition(it, DISP_OPEN_OPEN) &&
582                     !it_open_error(DISP_OPEN_OPEN, it)) {
583                         /*
584                          * If this is a successful OPEN request, we need to set
585                          * replay handler and data early, so that if replay
586                          * happens immediately after swabbing below, new reply
587                          * is swabbed by that handler correctly.
588                          */
589                         mdc_set_open_replay_data(NULL, NULL, req);
590                 }
591
592                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
593                         void *eadata;
594
595                         mdc_update_max_ea_from_body(exp, body);
596
597                         /*
598                          * The eadata is opaque; just check that it is there.
599                          * Eventually, obd_unpackmd() will check the contents.
600                          */
601                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
602                                                               body->eadatasize);
603                         if (eadata == NULL)
604                                 RETURN(-EPROTO);
605
606                         /* save lvb data and length in case this is for layout
607                          * lock */
608                         lvb_data = eadata;
609                         lvb_len = body->eadatasize;
610
611                         /*
612                          * We save the reply LOV EA in case we have to replay a
613                          * create for recovery.  If we didn't allocate a large
614                          * enough request buffer above we need to reallocate it
615                          * here to hold the actual LOV EA.
616                          *
617                          * To not save LOV EA if request is not going to replay
618                          * (for example error one).
619                          */
620                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
621                                 void *lmm;
622                                 if (req_capsule_get_size(pill, &RMF_EADATA,
623                                                          RCL_CLIENT) <
624                                     body->eadatasize)
625                                         mdc_realloc_openmsg(req, body);
626                                 else
627                                         req_capsule_shrink(pill, &RMF_EADATA,
628                                                            body->eadatasize,
629                                                            RCL_CLIENT);
630
631                                 req_capsule_set_size(pill, &RMF_EADATA,
632                                                      RCL_CLIENT,
633                                                      body->eadatasize);
634
635                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
636                                 if (lmm)
637                                         memcpy(lmm, eadata, body->eadatasize);
638                         }
639                 }
640
641                 if (body->valid & OBD_MD_FLRMTPERM) {
642                         struct mdt_remote_perm *perm;
643
644                         LASSERT(client_is_remote(exp));
645                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
646                                                 lustre_swab_mdt_remote_perm);
647                         if (perm == NULL)
648                                 RETURN(-EPROTO);
649                 }
650                 if (body->valid & OBD_MD_FLMDSCAPA) {
651                         struct lustre_capa *capa, *p;
652
653                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
654                         if (capa == NULL)
655                                 RETURN(-EPROTO);
656
657                         if (it->it_op & IT_OPEN) {
658                                 /* client fid capa will be checked in replay */
659                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
660                                 LASSERT(p);
661                                 *p = *capa;
662                         }
663                 }
664                 if (body->valid & OBD_MD_FLOSSCAPA) {
665                         struct lustre_capa *capa;
666
667                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
668                         if (capa == NULL)
669                                 RETURN(-EPROTO);
670                 }
671         } else if (it->it_op & IT_LAYOUT) {
672                 /* maybe the lock was granted right away and layout
673                  * is packed into RMF_DLM_LVB of req */
674                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
675                 if (lvb_len > 0) {
676                         lvb_data = req_capsule_server_sized_get(pill,
677                                                         &RMF_DLM_LVB, lvb_len);
678                         if (lvb_data == NULL)
679                                 RETURN(-EPROTO);
680                 }
681         }
682
683         /* fill in stripe data for layout lock */
684         lock = ldlm_handle2lock(lockh);
685         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
686                 void *lmm;
687
688                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
689                         ldlm_it2str(it->it_op), lvb_len);
690
691                 OBD_ALLOC_LARGE(lmm, lvb_len);
692                 if (lmm == NULL) {
693                         LDLM_LOCK_PUT(lock);
694                         RETURN(-ENOMEM);
695                 }
696                 memcpy(lmm, lvb_data, lvb_len);
697
698                 /* install lvb_data */
699                 lock_res_and_lock(lock);
700                 if (lock->l_lvb_data == NULL) {
701                         lock->l_lvb_data = lmm;
702                         lock->l_lvb_len = lvb_len;
703                         lmm = NULL;
704                 }
705                 unlock_res_and_lock(lock);
706                 if (lmm != NULL)
707                         OBD_FREE_LARGE(lmm, lvb_len);
708         }
709         if (lock != NULL)
710                 LDLM_LOCK_PUT(lock);
711
712         RETURN(rc);
713 }
714
715 /* We always reserve enough space in the reply packet for a stripe MD, because
716  * we don't know in advance the file type. */
717 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
718                 struct lookup_intent *it, struct md_op_data *op_data,
719                 struct lustre_handle *lockh, void *lmm, int lmmsize,
720                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
721 {
722         struct obd_device     *obddev = class_exp2obd(exp);
723         struct ptlrpc_request *req = NULL;
724         __u64                  flags, saved_flags = extra_lock_flags;
725         int                    rc;
726         struct ldlm_res_id res_id;
727         static const ldlm_policy_data_t lookup_policy =
728                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
729         static const ldlm_policy_data_t update_policy =
730                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
731         static const ldlm_policy_data_t layout_policy =
732                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
733         ldlm_policy_data_t const *policy = &lookup_policy;
734         int                    generation, resends = 0;
735         struct ldlm_reply     *lockrep;
736         enum lvb_type          lvb_type = 0;
737         ENTRY;
738
739         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
740                  einfo->ei_type);
741
742         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
743
744         if (it) {
745                 saved_flags |= LDLM_FL_HAS_INTENT;
746                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
747                         policy = &update_policy;
748                 else if (it->it_op & IT_LAYOUT)
749                         policy = &layout_policy;
750         }
751
752         LASSERT(reqp == NULL);
753
754         generation = obddev->u.cli.cl_import->imp_generation;
755 resend:
756         flags = saved_flags;
757         if (!it) {
758                 /* The only way right now is FLOCK, in this case we hide flock
759                    policy as lmm, but lmmsize is 0 */
760                 LASSERT(lmm && lmmsize == 0);
761                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
762                          einfo->ei_type);
763                 policy = (ldlm_policy_data_t *)lmm;
764                 res_id.name[3] = LDLM_FLOCK;
765         } else if (it->it_op & IT_OPEN) {
766                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
767                                            einfo->ei_cbdata);
768                 policy = &update_policy;
769                 einfo->ei_cbdata = NULL;
770                 lmm = NULL;
771         } else if (it->it_op & IT_UNLINK) {
772                 req = mdc_intent_unlink_pack(exp, it, op_data);
773         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
774                 req = mdc_intent_getattr_pack(exp, it, op_data);
775         } else if (it->it_op & IT_READDIR) {
776                 req = mdc_enqueue_pack(exp, 0);
777         } else if (it->it_op & IT_LAYOUT) {
778                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
779                         RETURN(-EOPNOTSUPP);
780
781                 req = mdc_intent_layout_pack(exp, it, op_data);
782                 lvb_type = LVB_T_LAYOUT;
783         } else {
784                 LBUG();
785                 RETURN(-EINVAL);
786         }
787
788         if (IS_ERR(req))
789                 RETURN(PTR_ERR(req));
790
791         if (req != NULL && it && it->it_op & IT_CREAT)
792                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
793                  * retry logic */
794                 req->rq_no_retry_einprogress = 1;
795
796         if (resends) {
797                 req->rq_generation_set = 1;
798                 req->rq_import_generation = generation;
799                 req->rq_sent = cfs_time_current_sec() + resends;
800         }
801
802         /* It is important to obtain rpc_lock first (if applicable), so that
803          * threads that are serialised with rpc_lock are not polluting our
804          * rpcs in flight counter. We do not do flock request limiting, though*/
805         if (it) {
806                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
807                 rc = mdc_enter_request(&obddev->u.cli);
808                 if (rc != 0) {
809                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
810                         mdc_clear_replay_flag(req, 0);
811                         ptlrpc_req_finished(req);
812                         RETURN(rc);
813                 }
814         }
815
816         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
817                               0, lvb_type, lockh, 0);
818         if (!it) {
819                 /* For flock requests we immediatelly return without further
820                    delay and let caller deal with the rest, since rest of
821                    this function metadata processing makes no sense for flock
822                    requests anyway */
823                 RETURN(rc);
824         }
825
826         mdc_exit_request(&obddev->u.cli);
827         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
828
829         if (rc < 0) {
830                 CERROR("ldlm_cli_enqueue: %d\n", rc);
831                 mdc_clear_replay_flag(req, rc);
832                 ptlrpc_req_finished(req);
833                 RETURN(rc);
834         }
835
836         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
837         LASSERT(lockrep != NULL);
838
839         /* Retry the create infinitely when we get -EINPROGRESS from
840          * server. This is required by the new quota design. */
841         if (it && it->it_op & IT_CREAT &&
842             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843                 mdc_clear_replay_flag(req, rc);
844                 ptlrpc_req_finished(req);
845                 resends++;
846
847                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848                        obddev->obd_name, resends, it->it_op,
849                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
850
851                 if (generation == obddev->u.cli.cl_import->imp_generation) {
852                         goto resend;
853                 } else {
854                         CDEBUG(D_HA, "resend cross eviction\n");
855                         RETURN(-EIO);
856                 }
857         }
858
859         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
860         if (rc < 0) {
861                 if (lustre_handle_is_used(lockh)) {
862                         ldlm_lock_decref(lockh, einfo->ei_mode);
863                         memset(lockh, 0, sizeof(*lockh));
864                 }
865                 ptlrpc_req_finished(req);
866         }
867         RETURN(rc);
868 }
869
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871                                   struct ptlrpc_request *request,
872                                   struct md_op_data *op_data,
873                                   struct lookup_intent *it,
874                                   struct lustre_handle *lockh)
875 {
876         struct lustre_handle old_lock;
877         struct mdt_body *mdt_body;
878         struct ldlm_lock *lock;
879         int rc;
880
881
882         LASSERT(request != NULL);
883         LASSERT(request != LP_POISON);
884         LASSERT(request->rq_repmsg != LP_POISON);
885
886         if (!it_disposition(it, DISP_IT_EXECD)) {
887                 /* The server failed before it even started executing the
888                  * intent, i.e. because it couldn't unpack the request. */
889                 LASSERT(it->d.lustre.it_status != 0);
890                 RETURN(it->d.lustre.it_status);
891         }
892         rc = it_open_error(DISP_IT_EXECD, it);
893         if (rc)
894                 RETURN(rc);
895
896         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
897         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
898
899         /* If we were revalidating a fid/name pair, mark the intent in
900          * case we fail and get called again from lookup */
901         if (fid_is_sane(&op_data->op_fid2) &&
902             it->it_create_mode & M_CHECK_STALE &&
903             it->it_op != IT_GETATTR) {
904                 it_set_disposition(it, DISP_ENQ_COMPLETE);
905
906                 /* Also: did we find the same inode? */
907                 /* sever can return one of two fids:
908                  * op_fid2 - new allocated fid - if file is created.
909                  * op_fid3 - existent fid - if file only open.
910                  * op_fid3 is saved in lmv_intent_open */
911                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
912                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
913                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
914                                "\n", PFID(&op_data->op_fid2),
915                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
916                         RETURN(-ESTALE);
917                 }
918         }
919
920         rc = it_open_error(DISP_LOOKUP_EXECD, it);
921         if (rc)
922                 RETURN(rc);
923
924         /* keep requests around for the multiple phases of the call
925          * this shows the DISP_XX must guarantee we make it into the call
926          */
927         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
928             it_disposition(it, DISP_OPEN_CREATE) &&
929             !it_open_error(DISP_OPEN_CREATE, it)) {
930                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
931                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
932         }
933         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
934             it_disposition(it, DISP_OPEN_OPEN) &&
935             !it_open_error(DISP_OPEN_OPEN, it)) {
936                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
937                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
938                 /* BUG 11546 - eviction in the middle of open rpc processing */
939                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
940         }
941
942         if (it->it_op & IT_CREAT) {
943                 /* XXX this belongs in ll_create_it */
944         } else if (it->it_op == IT_OPEN) {
945                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
946         } else {
947                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
948         }
949
950         /* If we already have a matching lock, then cancel the new
951          * one.  We have to set the data here instead of in
952          * mdc_enqueue, because we need to use the child's inode as
953          * the l_ast_data to match, and that's not available until
954          * intent_finish has performed the iget().) */
955         lock = ldlm_handle2lock(lockh);
956         if (lock) {
957                 ldlm_policy_data_t policy = lock->l_policy_data;
958                 LDLM_DEBUG(lock, "matching against this");
959
960                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
961                                          &lock->l_resource->lr_name),
962                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
963                          (unsigned long)lock->l_resource->lr_name.name[0],
964                          (unsigned long)lock->l_resource->lr_name.name[1],
965                          (unsigned long)lock->l_resource->lr_name.name[2],
966                          (unsigned long)fid_seq(&mdt_body->fid1),
967                          (unsigned long)fid_oid(&mdt_body->fid1),
968                          (unsigned long)fid_ver(&mdt_body->fid1));
969                 LDLM_LOCK_PUT(lock);
970
971                 memcpy(&old_lock, lockh, sizeof(*lockh));
972                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
973                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
974                         ldlm_lock_decref_and_cancel(lockh,
975                                                     it->d.lustre.it_lock_mode);
976                         memcpy(lockh, &old_lock, sizeof(old_lock));
977                         it->d.lustre.it_lock_handle = lockh->cookie;
978                 }
979         }
980         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
981                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
982                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
983         RETURN(rc);
984 }
985
986 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
987                         struct lu_fid *fid, __u64 *bits)
988 {
989         /* We could just return 1 immediately, but since we should only
990          * be called in revalidate_it if we already have a lock, let's
991          * verify that. */
992         struct ldlm_res_id res_id;
993         struct lustre_handle lockh;
994         ldlm_policy_data_t policy;
995         ldlm_mode_t mode;
996         ENTRY;
997
998         if (it->d.lustre.it_lock_handle) {
999                 lockh.cookie = it->d.lustre.it_lock_handle;
1000                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1001         } else {
1002                 fid_build_reg_res_name(fid, &res_id);
1003                 switch (it->it_op) {
1004                 case IT_GETATTR:
1005                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1006                         break;
1007                 case IT_LAYOUT:
1008                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1009                         break;
1010                 default:
1011                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1012                         break;
1013                 }
1014                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1015                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1016                                        LDLM_IBITS, &policy,
1017                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1018         }
1019
1020         if (mode) {
1021                 it->d.lustre.it_lock_handle = lockh.cookie;
1022                 it->d.lustre.it_lock_mode = mode;
1023         } else {
1024                 it->d.lustre.it_lock_handle = 0;
1025                 it->d.lustre.it_lock_mode = 0;
1026         }
1027
1028         RETURN(!!mode);
1029 }
1030
1031 /*
1032  * This long block is all about fixing up the lock and request state
1033  * so that it is correct as of the moment _before_ the operation was
1034  * applied; that way, the VFS will think that everything is normal and
1035  * call Lustre's regular VFS methods.
1036  *
1037  * If we're performing a creation, that means that unless the creation
1038  * failed with EEXIST, we should fake up a negative dentry.
1039  *
1040  * For everything else, we want to lookup to succeed.
1041  *
1042  * One additional note: if CREATE or OPEN succeeded, we add an extra
1043  * reference to the request because we need to keep it around until
1044  * ll_create/ll_open gets called.
1045  *
1046  * The server will return to us, in it_disposition, an indication of
1047  * exactly what d.lustre.it_status refers to.
1048  *
1049  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1050  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1051  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1052  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1053  * was successful.
1054  *
1055  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1056  * child lookup.
1057  */
1058 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1059                     void *lmm, int lmmsize, struct lookup_intent *it,
1060                     int lookup_flags, struct ptlrpc_request **reqp,
1061                     ldlm_blocking_callback cb_blocking,
1062                     __u64 extra_lock_flags)
1063 {
1064         struct lustre_handle lockh;
1065         int rc = 0;
1066         ENTRY;
1067         LASSERT(it);
1068
1069         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1070                ", intent: %s flags %#o\n", op_data->op_namelen,
1071                op_data->op_name, PFID(&op_data->op_fid2),
1072                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1073                it->it_flags);
1074
1075         lockh.cookie = 0;
1076         if (fid_is_sane(&op_data->op_fid2) &&
1077             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1078                 /* We could just return 1 immediately, but since we should only
1079                  * be called in revalidate_it if we already have a lock, let's
1080                  * verify that. */
1081                 it->d.lustre.it_lock_handle = 0;
1082                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1083                 /* Only return failure if it was not GETATTR by cfid
1084                    (from inode_revalidate) */
1085                 if (rc || op_data->op_namelen != 0)
1086                         RETURN(rc);
1087         }
1088
1089         /* lookup_it may be called only after revalidate_it has run, because
1090          * revalidate_it cannot return errors, only zero.  Returning zero causes
1091          * this call to lookup, which *can* return an error.
1092          *
1093          * We only want to execute the request associated with the intent one
1094          * time, however, so don't send the request again.  Instead, skip past
1095          * this and use the request from revalidate.  In this case, revalidate
1096          * never dropped its reference, so the refcounts are all OK */
1097         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1098                 struct ldlm_enqueue_info einfo =
1099                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1100                           ldlm_completion_ast, NULL, NULL, NULL };
1101
1102                 /* For case if upper layer did not alloc fid, do it now. */
1103                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1104                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1105                         if (rc < 0) {
1106                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1107                                 RETURN(rc);
1108                         }
1109                 }
1110                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1111                                  lmm, lmmsize, NULL, extra_lock_flags);
1112                 if (rc < 0)
1113                         RETURN(rc);
1114         } else if (!fid_is_sane(&op_data->op_fid2) ||
1115                    !(it->it_create_mode & M_CHECK_STALE)) {
1116                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1117                  * request referenced from this intent, saved for subsequent
1118                  * lookup.  This path is executed when we proceed to this
1119                  * lookup, so we clear DISP_ENQ_COMPLETE */
1120                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1121         }
1122         *reqp = it->d.lustre.it_data;
1123         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1124         RETURN(rc);
1125 }
1126
1127 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1128                                               struct ptlrpc_request *req,
1129                                               void *args, int rc)
1130 {
1131         struct mdc_getattr_args  *ga = args;
1132         struct obd_export        *exp = ga->ga_exp;
1133         struct md_enqueue_info   *minfo = ga->ga_minfo;
1134         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1135         struct lookup_intent     *it;
1136         struct lustre_handle     *lockh;
1137         struct obd_device        *obddev;
1138         __u64                     flags = LDLM_FL_HAS_INTENT;
1139         ENTRY;
1140
1141         it    = &minfo->mi_it;
1142         lockh = &minfo->mi_lockh;
1143
1144         obddev = class_exp2obd(exp);
1145
1146         mdc_exit_request(&obddev->u.cli);
1147         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1148                 rc = -ETIMEDOUT;
1149
1150         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1151                                    &flags, NULL, 0, lockh, rc);
1152         if (rc < 0) {
1153                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1154                 mdc_clear_replay_flag(req, rc);
1155                 GOTO(out, rc);
1156         }
1157
1158         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1159         if (rc)
1160                 GOTO(out, rc);
1161
1162         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1163         EXIT;
1164
1165 out:
1166         OBD_FREE_PTR(einfo);
1167         minfo->mi_cb(req, minfo, rc);
1168         return 0;
1169 }
1170
1171 int mdc_intent_getattr_async(struct obd_export *exp,
1172                              struct md_enqueue_info *minfo,
1173                              struct ldlm_enqueue_info *einfo)
1174 {
1175         struct md_op_data       *op_data = &minfo->mi_data;
1176         struct lookup_intent    *it = &minfo->mi_it;
1177         struct ptlrpc_request   *req;
1178         struct mdc_getattr_args *ga;
1179         struct obd_device       *obddev = class_exp2obd(exp);
1180         struct ldlm_res_id       res_id;
1181         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1182          *     for statahead currently. Consider CMD in future, such two bits
1183          *     maybe managed by different MDS, should be adjusted then. */
1184         ldlm_policy_data_t       policy = {
1185                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1186                                                          MDS_INODELOCK_UPDATE }
1187                                  };
1188         int                      rc = 0;
1189         __u64                    flags = LDLM_FL_HAS_INTENT;
1190         ENTRY;
1191
1192         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1193                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1194                ldlm_it2str(it->it_op), it->it_flags);
1195
1196         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1197         req = mdc_intent_getattr_pack(exp, it, op_data);
1198         if (!req)
1199                 RETURN(-ENOMEM);
1200
1201         rc = mdc_enter_request(&obddev->u.cli);
1202         if (rc != 0) {
1203                 ptlrpc_req_finished(req);
1204                 RETURN(rc);
1205         }
1206
1207         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1208                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1209         if (rc < 0) {
1210                 mdc_exit_request(&obddev->u.cli);
1211                 ptlrpc_req_finished(req);
1212                 RETURN(rc);
1213         }
1214
1215         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1216         ga = ptlrpc_req_async_args(req);
1217         ga->ga_exp = exp;
1218         ga->ga_minfo = minfo;
1219         ga->ga_einfo = einfo;
1220
1221         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1222         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1223
1224         RETURN(0);
1225 }