Whamcloud - gitweb
LU-3200 mdc: layout lock rpc must not take rpc_lock
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         struct inode *new_inode = data;
122         ENTRY;
123
124         if(bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 RETURN(0);
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134 #ifdef __KERNEL__
135         if (lock->l_resource->lr_lvb_inode &&
136             lock->l_resource->lr_lvb_inode != data) {
137                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: "
140                          "setting data to %p/%lu/%u\n", old_inode,
141                          old_inode->i_ino, old_inode->i_generation,
142                          old_inode->i_state,
143                          new_inode, new_inode->i_ino, new_inode->i_generation);
144         }
145 #endif
146         lock->l_resource->lr_lvb_inode = new_inode;
147         if (bits)
148                 *bits = lock->l_policy_data.l_inodebits.bits;
149
150         unlock_res_and_lock(lock);
151         LDLM_LOCK_PUT(lock);
152
153         RETURN(0);
154 }
155
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157                            const struct lu_fid *fid, ldlm_type_t type,
158                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
159                            struct lustre_handle *lockh)
160 {
161         struct ldlm_res_id res_id;
162         ldlm_mode_t rc;
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167                              &res_id, type, policy, mode, lockh, 0);
168         RETURN(rc);
169 }
170
171 int mdc_cancel_unused(struct obd_export *exp,
172                       const struct lu_fid *fid,
173                       ldlm_policy_data_t *policy,
174                       ldlm_mode_t mode,
175                       ldlm_cancel_flags_t flags,
176                       void *opaque)
177 {
178         struct ldlm_res_id res_id;
179         struct obd_device *obd = class_exp2obd(exp);
180         int rc;
181
182         ENTRY;
183
184         fid_build_reg_res_name(fid, &res_id);
185         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186                                              policy, mode, flags, opaque);
187         RETURN(rc);
188 }
189
190 int mdc_null_inode(struct obd_export *exp,
191                    const struct lu_fid *fid)
192 {
193         struct ldlm_res_id res_id;
194         struct ldlm_resource *res;
195         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
196         ENTRY;
197
198         LASSERTF(ns != NULL, "no namespace passed\n");
199
200         fid_build_reg_res_name(fid, &res_id);
201
202         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203         if(res == NULL)
204                 RETURN(0);
205
206         lock_res(res);
207         res->lr_lvb_inode = NULL;
208         unlock_res(res);
209
210         ldlm_resource_putref(res);
211         RETURN(0);
212 }
213
214 /* find any ldlm lock of the inode in mdc
215  * return 0    not find
216  *        1    find one
217  *      < 0    error */
218 int mdc_find_cbdata(struct obd_export *exp,
219                     const struct lu_fid *fid,
220                     ldlm_iterator_t it, void *data)
221 {
222         struct ldlm_res_id res_id;
223         int rc = 0;
224         ENTRY;
225
226         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
228                                    it, data);
229         if (rc == LDLM_ITER_STOP)
230                 RETURN(1);
231         else if (rc == LDLM_ITER_CONTINUE)
232                 RETURN(0);
233         RETURN(rc);
234 }
235
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
237 {
238         /* Don't hold error requests for replay. */
239         if (req->rq_replay) {
240                 spin_lock(&req->rq_lock);
241                 req->rq_replay = 0;
242                 spin_unlock(&req->rq_lock);
243         }
244         if (rc && req->rq_transno != 0) {
245                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
246                 LBUG();
247         }
248 }
249
250 /* Save a large LOV EA into the request buffer so that it is available
251  * for replay.  We don't do this in the initial request because the
252  * original request doesn't need this buffer (at most it sends just the
253  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254  * buffer and may also be difficult to allocate and save a very large
255  * request buffer for each open. (bug 5707)
256  *
257  * OOM here may cause recovery failure if lmm is needed (only for the
258  * original open if the MDS crashed just when this client also OOM'd)
259  * but this is incredibly unlikely, and questionable whether the client
260  * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262                                 struct mdt_body *body)
263 {
264         int     rc;
265
266         /* FIXME: remove this explicit offset. */
267         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
268                                         body->eadatasize);
269         if (rc) {
270                 CERROR("Can't enlarge segment %d size to %d\n",
271                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
272                 body->valid &= ~OBD_MD_FLEASIZE;
273                 body->eadatasize = 0;
274         }
275 }
276
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278                                                    struct lookup_intent *it,
279                                                    struct md_op_data *op_data,
280                                                    void *lmm, int lmmsize,
281                                                    void *cb_data)
282 {
283         struct ptlrpc_request *req;
284         struct obd_device     *obddev = class_exp2obd(exp);
285         struct ldlm_intent    *lit;
286         CFS_LIST_HEAD(cancels);
287         int                    count = 0;
288         int                    mode;
289         int                    rc;
290         ENTRY;
291
292         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
293
294         /* XXX: openlock is not cancelled for cross-refs. */
295         /* If inode is known, cancel conflicting OPEN locks. */
296         if (fid_is_sane(&op_data->op_fid2)) {
297                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                         mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                 else if (it->it_flags & FMODE_EXEC)
301                         mode = LCK_PR;
302 #endif
303                 else
304                         mode = LCK_CR;
305                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306                                                 &cancels, mode,
307                                                 MDS_INODELOCK_OPEN);
308         }
309
310         /* If CREATE, cancel parent's UPDATE lock. */
311         if (it->it_op & IT_CREAT)
312                 mode = LCK_EX;
313         else
314                 mode = LCK_CR;
315         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
316                                          &cancels, mode,
317                                          MDS_INODELOCK_UPDATE);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320                                    &RQF_LDLM_INTENT_OPEN);
321         if (req == NULL) {
322                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323                 RETURN(ERR_PTR(-ENOMEM));
324         }
325
326         /* parent capability */
327         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328         /* child capability, reserve the size according to parent capa, it will
329          * be filled after we get the reply */
330         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
331
332         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333                              op_data->op_namelen + 1);
334         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
336
337         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 return NULL;
341         }
342
343         spin_lock(&req->rq_lock);
344         req->rq_replay = req->rq_import->imp_replayable;
345         spin_unlock(&req->rq_lock);
346
347         /* pack the intent */
348         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349         lit->opc = (__u64)it->it_op;
350
351         /* pack the intended request */
352         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
353                       lmmsize);
354
355         /* for remote client, fetch remote perm for current user */
356         if (client_is_remote(exp))
357                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358                                      sizeof(struct mdt_remote_perm));
359         ptlrpc_request_set_replen(req);
360         return req;
361 }
362
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364                                                      struct lookup_intent *it,
365                                                      struct md_op_data *op_data)
366 {
367         struct ptlrpc_request *req;
368         struct obd_device     *obddev = class_exp2obd(exp);
369         struct ldlm_intent    *lit;
370         int                    rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374                                    &RQF_LDLM_INTENT_UNLINK);
375         if (req == NULL)
376                 RETURN(ERR_PTR(-ENOMEM));
377
378         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380                              op_data->op_namelen + 1);
381
382         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(ERR_PTR(rc));
386         }
387
388         /* pack the intent */
389         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390         lit->opc = (__u64)it->it_op;
391
392         /* pack the intended request */
393         mdc_unlink_pack(req, op_data);
394
395         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396                              obddev->u.cli.cl_max_mds_easize);
397         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398                              obddev->u.cli.cl_max_mds_cookiesize);
399         ptlrpc_request_set_replen(req);
400         RETURN(req);
401 }
402
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404                                                       struct lookup_intent *it,
405                                                       struct md_op_data *op_data)
406 {
407         struct ptlrpc_request *req;
408         struct obd_device     *obddev = class_exp2obd(exp);
409         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412                                        (client_is_remote(exp) ?
413                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414         struct ldlm_intent    *lit;
415         int                    rc;
416         ENTRY;
417
418         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419                                    &RQF_LDLM_INTENT_GETATTR);
420         if (req == NULL)
421                 RETURN(ERR_PTR(-ENOMEM));
422
423         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425                              op_data->op_namelen + 1);
426
427         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(ERR_PTR(rc));
431         }
432
433         /* pack the intent */
434         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435         lit->opc = (__u64)it->it_op;
436
437         /* pack the intended request */
438         mdc_getattr_pack(req, valid, it->it_flags, op_data,
439                          obddev->u.cli.cl_max_mds_easize);
440
441         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442                              obddev->u.cli.cl_max_mds_easize);
443         if (client_is_remote(exp))
444                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445                                      sizeof(struct mdt_remote_perm));
446         ptlrpc_request_set_replen(req);
447         RETURN(req);
448 }
449
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451                                                      struct lookup_intent *it,
452                                                      struct md_op_data *unused)
453 {
454         struct obd_device     *obd = class_exp2obd(exp);
455         struct ptlrpc_request *req;
456         struct ldlm_intent    *lit;
457         struct layout_intent  *layout;
458         int rc;
459         ENTRY;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                 &RQF_LDLM_INTENT_LAYOUT);
463         if (req == NULL)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
468         if (rc) {
469                 ptlrpc_request_free(req);
470                 RETURN(ERR_PTR(rc));
471         }
472
473         /* pack the intent */
474         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475         lit->opc = (__u64)it->it_op;
476
477         /* pack the layout intent request */
478         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480          * set for replication */
481         layout->li_opc = LAYOUT_INTENT_ACCESS;
482
483         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484                         obd->u.cli.cl_max_mds_easize);
485         ptlrpc_request_set_replen(req);
486         RETURN(req);
487 }
488
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
491 {
492         struct ptlrpc_request *req;
493         int rc;
494         ENTRY;
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
497         if (req == NULL)
498                 RETURN(ERR_PTR(-ENOMEM));
499
500         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(ERR_PTR(rc));
504         }
505
506         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507         ptlrpc_request_set_replen(req);
508         RETURN(req);
509 }
510
511 static int mdc_finish_enqueue(struct obd_export *exp,
512                               struct ptlrpc_request *req,
513                               struct ldlm_enqueue_info *einfo,
514                               struct lookup_intent *it,
515                               struct lustre_handle *lockh,
516                               int rc)
517 {
518         struct req_capsule  *pill = &req->rq_pill;
519         struct ldlm_request *lockreq;
520         struct ldlm_reply   *lockrep;
521         struct lustre_intent_data *intent = &it->d.lustre;
522         struct ldlm_lock    *lock;
523         void                *lvb_data = NULL;
524         int                  lvb_len = 0;
525         ENTRY;
526
527         LASSERT(rc >= 0);
528         /* Similarly, if we're going to replay this request, we don't want to
529          * actually get a lock, just perform the intent. */
530         if (req->rq_transno || req->rq_replay) {
531                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
533         }
534
535         if (rc == ELDLM_LOCK_ABORTED) {
536                 einfo->ei_mode = 0;
537                 memset(lockh, 0, sizeof(*lockh));
538                 rc = 0;
539         } else { /* rc = 0 */
540                 lock = ldlm_handle2lock(lockh);
541                 LASSERT(lock != NULL);
542
543                 /* If the server gave us back a different lock mode, we should
544                  * fix up our variables. */
545                 if (lock->l_req_mode != einfo->ei_mode) {
546                         ldlm_lock_addref(lockh, lock->l_req_mode);
547                         ldlm_lock_decref(lockh, einfo->ei_mode);
548                         einfo->ei_mode = lock->l_req_mode;
549                 }
550                 LDLM_LOCK_PUT(lock);
551         }
552
553         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
555
556         intent->it_disposition = (int)lockrep->lock_policy_res1;
557         intent->it_status = (int)lockrep->lock_policy_res2;
558         intent->it_lock_mode = einfo->ei_mode;
559         intent->it_lock_handle = lockh->cookie;
560         intent->it_data = req;
561
562         /* Technically speaking rq_transno must already be zero if
563          * it_status is in error, so the check is a bit redundant */
564         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565                 mdc_clear_replay_flag(req, intent->it_status);
566
567         /* If we're doing an IT_OPEN which did not result in an actual
568          * successful open, then we need to remove the bit which saves
569          * this request for unconditional replay.
570          *
571          * It's important that we do this first!  Otherwise we might exit the
572          * function without doing so, and try to replay a failed create
573          * (bug 3440) */
574         if (it->it_op & IT_OPEN && req->rq_replay &&
575             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576                 mdc_clear_replay_flag(req, intent->it_status);
577
578         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579                   it->it_op, intent->it_disposition, intent->it_status);
580
581         /* We know what to expect, so we do any byte flipping required here */
582         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583                 struct mdt_body *body;
584
585                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
586                 if (body == NULL) {
587                         CERROR ("Can't swab mdt_body\n");
588                         RETURN (-EPROTO);
589                 }
590
591                 if (it_disposition(it, DISP_OPEN_OPEN) &&
592                     !it_open_error(DISP_OPEN_OPEN, it)) {
593                         /*
594                          * If this is a successful OPEN request, we need to set
595                          * replay handler and data early, so that if replay
596                          * happens immediately after swabbing below, new reply
597                          * is swabbed by that handler correctly.
598                          */
599                         mdc_set_open_replay_data(NULL, NULL, req);
600                 }
601
602                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
603                         void *eadata;
604
605                         mdc_update_max_ea_from_body(exp, body);
606
607                         /*
608                          * The eadata is opaque; just check that it is there.
609                          * Eventually, obd_unpackmd() will check the contents.
610                          */
611                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
612                                                               body->eadatasize);
613                         if (eadata == NULL)
614                                 RETURN(-EPROTO);
615
616                         /* save lvb data and length in case this is for layout
617                          * lock */
618                         lvb_data = eadata;
619                         lvb_len = body->eadatasize;
620
621                         /*
622                          * We save the reply LOV EA in case we have to replay a
623                          * create for recovery.  If we didn't allocate a large
624                          * enough request buffer above we need to reallocate it
625                          * here to hold the actual LOV EA.
626                          *
627                          * To not save LOV EA if request is not going to replay
628                          * (for example error one).
629                          */
630                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
631                                 void *lmm;
632                                 if (req_capsule_get_size(pill, &RMF_EADATA,
633                                                          RCL_CLIENT) <
634                                     body->eadatasize)
635                                         mdc_realloc_openmsg(req, body);
636                                 else
637                                         req_capsule_shrink(pill, &RMF_EADATA,
638                                                            body->eadatasize,
639                                                            RCL_CLIENT);
640
641                                 req_capsule_set_size(pill, &RMF_EADATA,
642                                                      RCL_CLIENT,
643                                                      body->eadatasize);
644
645                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
646                                 if (lmm)
647                                         memcpy(lmm, eadata, body->eadatasize);
648                         }
649                 }
650
651                 if (body->valid & OBD_MD_FLRMTPERM) {
652                         struct mdt_remote_perm *perm;
653
654                         LASSERT(client_is_remote(exp));
655                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656                                                 lustre_swab_mdt_remote_perm);
657                         if (perm == NULL)
658                                 RETURN(-EPROTO);
659                 }
660                 if (body->valid & OBD_MD_FLMDSCAPA) {
661                         struct lustre_capa *capa, *p;
662
663                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
664                         if (capa == NULL)
665                                 RETURN(-EPROTO);
666
667                         if (it->it_op & IT_OPEN) {
668                                 /* client fid capa will be checked in replay */
669                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
670                                 LASSERT(p);
671                                 *p = *capa;
672                         }
673                 }
674                 if (body->valid & OBD_MD_FLOSSCAPA) {
675                         struct lustre_capa *capa;
676
677                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
678                         if (capa == NULL)
679                                 RETURN(-EPROTO);
680                 }
681         } else if (it->it_op & IT_LAYOUT) {
682                 /* maybe the lock was granted right away and layout
683                  * is packed into RMF_DLM_LVB of req */
684                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
685                 if (lvb_len > 0) {
686                         lvb_data = req_capsule_server_sized_get(pill,
687                                                         &RMF_DLM_LVB, lvb_len);
688                         if (lvb_data == NULL)
689                                 RETURN(-EPROTO);
690                 }
691         }
692
693         /* fill in stripe data for layout lock */
694         lock = ldlm_handle2lock(lockh);
695         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
696                 void *lmm;
697
698                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699                         ldlm_it2str(it->it_op), lvb_len);
700
701                 OBD_ALLOC_LARGE(lmm, lvb_len);
702                 if (lmm == NULL) {
703                         LDLM_LOCK_PUT(lock);
704                         RETURN(-ENOMEM);
705                 }
706                 memcpy(lmm, lvb_data, lvb_len);
707
708                 /* install lvb_data */
709                 lock_res_and_lock(lock);
710                 if (lock->l_lvb_data == NULL) {
711                         lock->l_lvb_data = lmm;
712                         lock->l_lvb_len = lvb_len;
713                         lmm = NULL;
714                 }
715                 unlock_res_and_lock(lock);
716                 if (lmm != NULL)
717                         OBD_FREE_LARGE(lmm, lvb_len);
718         }
719         if (lock != NULL)
720                 LDLM_LOCK_PUT(lock);
721
722         RETURN(rc);
723 }
724
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726  * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728                 struct lookup_intent *it, struct md_op_data *op_data,
729                 struct lustre_handle *lockh, void *lmm, int lmmsize,
730                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
731 {
732         struct obd_device     *obddev = class_exp2obd(exp);
733         struct ptlrpc_request *req = NULL;
734         __u64                  flags, saved_flags = extra_lock_flags;
735         int                    rc;
736         struct ldlm_res_id res_id;
737         static const ldlm_policy_data_t lookup_policy =
738                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const ldlm_policy_data_t update_policy =
740                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const ldlm_policy_data_t layout_policy =
742                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         ldlm_policy_data_t const *policy = &lookup_policy;
744         int                    generation, resends = 0;
745         struct ldlm_reply     *lockrep;
746         enum lvb_type          lvb_type = 0;
747         ENTRY;
748
749         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750                  einfo->ei_type);
751
752         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
753
754         if (it) {
755                 saved_flags |= LDLM_FL_HAS_INTENT;
756                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757                         policy = &update_policy;
758                 else if (it->it_op & IT_LAYOUT)
759                         policy = &layout_policy;
760         }
761
762         LASSERT(reqp == NULL);
763
764         generation = obddev->u.cli.cl_import->imp_generation;
765 resend:
766         flags = saved_flags;
767         if (!it) {
768                 /* The only way right now is FLOCK, in this case we hide flock
769                    policy as lmm, but lmmsize is 0 */
770                 LASSERT(lmm && lmmsize == 0);
771                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
772                          einfo->ei_type);
773                 policy = (ldlm_policy_data_t *)lmm;
774                 res_id.name[3] = LDLM_FLOCK;
775         } else if (it->it_op & IT_OPEN) {
776                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
777                                            einfo->ei_cbdata);
778                 policy = &update_policy;
779                 einfo->ei_cbdata = NULL;
780                 lmm = NULL;
781         } else if (it->it_op & IT_UNLINK) {
782                 req = mdc_intent_unlink_pack(exp, it, op_data);
783         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784                 req = mdc_intent_getattr_pack(exp, it, op_data);
785         } else if (it->it_op & IT_READDIR) {
786                 req = mdc_enqueue_pack(exp, 0);
787         } else if (it->it_op & IT_LAYOUT) {
788                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
789                         RETURN(-EOPNOTSUPP);
790
791                 req = mdc_intent_layout_pack(exp, it, op_data);
792                 lvb_type = LVB_T_LAYOUT;
793         } else {
794                 LBUG();
795                 RETURN(-EINVAL);
796         }
797
798         if (IS_ERR(req))
799                 RETURN(PTR_ERR(req));
800
801         if (req != NULL && it && it->it_op & IT_CREAT)
802                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
803                  * retry logic */
804                 req->rq_no_retry_einprogress = 1;
805
806         if (resends) {
807                 req->rq_generation_set = 1;
808                 req->rq_import_generation = generation;
809                 req->rq_sent = cfs_time_current_sec() + resends;
810         }
811
812         /* It is important to obtain rpc_lock first (if applicable), so that
813          * threads that are serialised with rpc_lock are not polluting our
814          * rpcs in flight counter. We do not do flock request limiting, though*/
815         if (it) {
816                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817                 rc = mdc_enter_request(&obddev->u.cli);
818                 if (rc != 0) {
819                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820                         mdc_clear_replay_flag(req, 0);
821                         ptlrpc_req_finished(req);
822                         RETURN(rc);
823                 }
824         }
825
826         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827                               0, lvb_type, lockh, 0);
828         if (!it) {
829                 /* For flock requests we immediatelly return without further
830                    delay and let caller deal with the rest, since rest of
831                    this function metadata processing makes no sense for flock
832                    requests anyway */
833                 RETURN(rc);
834         }
835
836         mdc_exit_request(&obddev->u.cli);
837         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
838
839         if (rc < 0) {
840                 CERROR("ldlm_cli_enqueue: %d\n", rc);
841                 mdc_clear_replay_flag(req, rc);
842                 ptlrpc_req_finished(req);
843                 RETURN(rc);
844         }
845
846         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
847         LASSERT(lockrep != NULL);
848
849         lockrep->lock_policy_res2 =
850                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
851
852         /* Retry the create infinitely when we get -EINPROGRESS from
853          * server. This is required by the new quota design. */
854         if (it && it->it_op & IT_CREAT &&
855             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
856                 mdc_clear_replay_flag(req, rc);
857                 ptlrpc_req_finished(req);
858                 resends++;
859
860                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
861                        obddev->obd_name, resends, it->it_op,
862                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
863
864                 if (generation == obddev->u.cli.cl_import->imp_generation) {
865                         goto resend;
866                 } else {
867                         CDEBUG(D_HA, "resend cross eviction\n");
868                         RETURN(-EIO);
869                 }
870         }
871
872         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
873         if (rc < 0) {
874                 if (lustre_handle_is_used(lockh)) {
875                         ldlm_lock_decref(lockh, einfo->ei_mode);
876                         memset(lockh, 0, sizeof(*lockh));
877                 }
878                 ptlrpc_req_finished(req);
879         }
880         RETURN(rc);
881 }
882
883 static int mdc_finish_intent_lock(struct obd_export *exp,
884                                   struct ptlrpc_request *request,
885                                   struct md_op_data *op_data,
886                                   struct lookup_intent *it,
887                                   struct lustre_handle *lockh)
888 {
889         struct lustre_handle old_lock;
890         struct mdt_body *mdt_body;
891         struct ldlm_lock *lock;
892         int rc;
893         ENTRY;
894
895         LASSERT(request != NULL);
896         LASSERT(request != LP_POISON);
897         LASSERT(request->rq_repmsg != LP_POISON);
898
899         if (!it_disposition(it, DISP_IT_EXECD)) {
900                 /* The server failed before it even started executing the
901                  * intent, i.e. because it couldn't unpack the request. */
902                 LASSERT(it->d.lustre.it_status != 0);
903                 RETURN(it->d.lustre.it_status);
904         }
905         rc = it_open_error(DISP_IT_EXECD, it);
906         if (rc)
907                 RETURN(rc);
908
909         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
910         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
911
912         /* If we were revalidating a fid/name pair, mark the intent in
913          * case we fail and get called again from lookup */
914         if (fid_is_sane(&op_data->op_fid2) &&
915             it->it_create_mode & M_CHECK_STALE &&
916             it->it_op != IT_GETATTR) {
917                 it_set_disposition(it, DISP_ENQ_COMPLETE);
918
919                 /* Also: did we find the same inode? */
920                 /* sever can return one of two fids:
921                  * op_fid2 - new allocated fid - if file is created.
922                  * op_fid3 - existent fid - if file only open.
923                  * op_fid3 is saved in lmv_intent_open */
924                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
925                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
926                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
927                                "\n", PFID(&op_data->op_fid2),
928                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
929                         RETURN(-ESTALE);
930                 }
931         }
932
933         rc = it_open_error(DISP_LOOKUP_EXECD, it);
934         if (rc)
935                 RETURN(rc);
936
937         /* keep requests around for the multiple phases of the call
938          * this shows the DISP_XX must guarantee we make it into the call
939          */
940         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
941             it_disposition(it, DISP_OPEN_CREATE) &&
942             !it_open_error(DISP_OPEN_CREATE, it)) {
943                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
944                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
945         }
946         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
947             it_disposition(it, DISP_OPEN_OPEN) &&
948             !it_open_error(DISP_OPEN_OPEN, it)) {
949                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
950                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
951                 /* BUG 11546 - eviction in the middle of open rpc processing */
952                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
953         }
954
955         if (it->it_op & IT_CREAT) {
956                 /* XXX this belongs in ll_create_it */
957         } else if (it->it_op == IT_OPEN) {
958                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
959         } else {
960                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
961         }
962
963         /* If we already have a matching lock, then cancel the new
964          * one.  We have to set the data here instead of in
965          * mdc_enqueue, because we need to use the child's inode as
966          * the l_ast_data to match, and that's not available until
967          * intent_finish has performed the iget().) */
968         lock = ldlm_handle2lock(lockh);
969         if (lock) {
970                 ldlm_policy_data_t policy = lock->l_policy_data;
971                 LDLM_DEBUG(lock, "matching against this");
972
973                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
974                                          &lock->l_resource->lr_name),
975                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
976                          (unsigned long)lock->l_resource->lr_name.name[0],
977                          (unsigned long)lock->l_resource->lr_name.name[1],
978                          (unsigned long)lock->l_resource->lr_name.name[2],
979                          (unsigned long)fid_seq(&mdt_body->fid1),
980                          (unsigned long)fid_oid(&mdt_body->fid1),
981                          (unsigned long)fid_ver(&mdt_body->fid1));
982                 LDLM_LOCK_PUT(lock);
983
984                 memcpy(&old_lock, lockh, sizeof(*lockh));
985                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
986                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
987                         ldlm_lock_decref_and_cancel(lockh,
988                                                     it->d.lustre.it_lock_mode);
989                         memcpy(lockh, &old_lock, sizeof(old_lock));
990                         it->d.lustre.it_lock_handle = lockh->cookie;
991                 }
992         }
993         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
994                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
995                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
996         RETURN(rc);
997 }
998
999 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1000                         struct lu_fid *fid, __u64 *bits)
1001 {
1002         /* We could just return 1 immediately, but since we should only
1003          * be called in revalidate_it if we already have a lock, let's
1004          * verify that. */
1005         struct ldlm_res_id res_id;
1006         struct lustre_handle lockh;
1007         ldlm_policy_data_t policy;
1008         ldlm_mode_t mode;
1009         ENTRY;
1010
1011         if (it->d.lustre.it_lock_handle) {
1012                 lockh.cookie = it->d.lustre.it_lock_handle;
1013                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1014         } else {
1015                 fid_build_reg_res_name(fid, &res_id);
1016                 switch (it->it_op) {
1017                 case IT_GETATTR:
1018                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1019                         break;
1020                 case IT_LAYOUT:
1021                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1022                         break;
1023                 default:
1024                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1025                         break;
1026                 }
1027                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1028                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1029                                        LDLM_IBITS, &policy,
1030                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1031         }
1032
1033         if (mode) {
1034                 it->d.lustre.it_lock_handle = lockh.cookie;
1035                 it->d.lustre.it_lock_mode = mode;
1036         } else {
1037                 it->d.lustre.it_lock_handle = 0;
1038                 it->d.lustre.it_lock_mode = 0;
1039         }
1040
1041         RETURN(!!mode);
1042 }
1043
1044 /*
1045  * This long block is all about fixing up the lock and request state
1046  * so that it is correct as of the moment _before_ the operation was
1047  * applied; that way, the VFS will think that everything is normal and
1048  * call Lustre's regular VFS methods.
1049  *
1050  * If we're performing a creation, that means that unless the creation
1051  * failed with EEXIST, we should fake up a negative dentry.
1052  *
1053  * For everything else, we want to lookup to succeed.
1054  *
1055  * One additional note: if CREATE or OPEN succeeded, we add an extra
1056  * reference to the request because we need to keep it around until
1057  * ll_create/ll_open gets called.
1058  *
1059  * The server will return to us, in it_disposition, an indication of
1060  * exactly what d.lustre.it_status refers to.
1061  *
1062  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1063  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1064  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1065  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1066  * was successful.
1067  *
1068  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1069  * child lookup.
1070  */
1071 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1072                     void *lmm, int lmmsize, struct lookup_intent *it,
1073                     int lookup_flags, struct ptlrpc_request **reqp,
1074                     ldlm_blocking_callback cb_blocking,
1075                     __u64 extra_lock_flags)
1076 {
1077         struct lustre_handle lockh;
1078         int rc = 0;
1079         ENTRY;
1080         LASSERT(it);
1081
1082         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1083                ", intent: %s flags %#o\n", op_data->op_namelen,
1084                op_data->op_name, PFID(&op_data->op_fid2),
1085                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1086                it->it_flags);
1087
1088         lockh.cookie = 0;
1089         if (fid_is_sane(&op_data->op_fid2) &&
1090             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1091                 /* We could just return 1 immediately, but since we should only
1092                  * be called in revalidate_it if we already have a lock, let's
1093                  * verify that. */
1094                 it->d.lustre.it_lock_handle = 0;
1095                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1096                 /* Only return failure if it was not GETATTR by cfid
1097                    (from inode_revalidate) */
1098                 if (rc || op_data->op_namelen != 0)
1099                         RETURN(rc);
1100         }
1101
1102         /* lookup_it may be called only after revalidate_it has run, because
1103          * revalidate_it cannot return errors, only zero.  Returning zero causes
1104          * this call to lookup, which *can* return an error.
1105          *
1106          * We only want to execute the request associated with the intent one
1107          * time, however, so don't send the request again.  Instead, skip past
1108          * this and use the request from revalidate.  In this case, revalidate
1109          * never dropped its reference, so the refcounts are all OK */
1110         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1111                 struct ldlm_enqueue_info einfo =
1112                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1113                           ldlm_completion_ast, NULL, NULL, NULL };
1114
1115                 /* For case if upper layer did not alloc fid, do it now. */
1116                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1117                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1118                         if (rc < 0) {
1119                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1120                                 RETURN(rc);
1121                         }
1122                 }
1123                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1124                                  lmm, lmmsize, NULL, extra_lock_flags);
1125                 if (rc < 0)
1126                         RETURN(rc);
1127         } else if (!fid_is_sane(&op_data->op_fid2) ||
1128                    !(it->it_create_mode & M_CHECK_STALE)) {
1129                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1130                  * request referenced from this intent, saved for subsequent
1131                  * lookup.  This path is executed when we proceed to this
1132                  * lookup, so we clear DISP_ENQ_COMPLETE */
1133                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1134         }
1135         *reqp = it->d.lustre.it_data;
1136         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1137         RETURN(rc);
1138 }
1139
1140 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1141                                               struct ptlrpc_request *req,
1142                                               void *args, int rc)
1143 {
1144         struct mdc_getattr_args  *ga = args;
1145         struct obd_export        *exp = ga->ga_exp;
1146         struct md_enqueue_info   *minfo = ga->ga_minfo;
1147         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1148         struct lookup_intent     *it;
1149         struct lustre_handle     *lockh;
1150         struct obd_device        *obddev;
1151         struct ldlm_reply        *lockrep;
1152         __u64                     flags = LDLM_FL_HAS_INTENT;
1153         ENTRY;
1154
1155         it    = &minfo->mi_it;
1156         lockh = &minfo->mi_lockh;
1157
1158         obddev = class_exp2obd(exp);
1159
1160         mdc_exit_request(&obddev->u.cli);
1161         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1162                 rc = -ETIMEDOUT;
1163
1164         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1165                                    &flags, NULL, 0, lockh, rc);
1166         if (rc < 0) {
1167                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1168                 mdc_clear_replay_flag(req, rc);
1169                 GOTO(out, rc);
1170         }
1171
1172         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1173         LASSERT(lockrep != NULL);
1174
1175         lockrep->lock_policy_res2 =
1176                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1177
1178         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1179         if (rc)
1180                 GOTO(out, rc);
1181
1182         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1183         EXIT;
1184
1185 out:
1186         OBD_FREE_PTR(einfo);
1187         minfo->mi_cb(req, minfo, rc);
1188         return 0;
1189 }
1190
1191 int mdc_intent_getattr_async(struct obd_export *exp,
1192                              struct md_enqueue_info *minfo,
1193                              struct ldlm_enqueue_info *einfo)
1194 {
1195         struct md_op_data       *op_data = &minfo->mi_data;
1196         struct lookup_intent    *it = &minfo->mi_it;
1197         struct ptlrpc_request   *req;
1198         struct mdc_getattr_args *ga;
1199         struct obd_device       *obddev = class_exp2obd(exp);
1200         struct ldlm_res_id       res_id;
1201         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1202          *     for statahead currently. Consider CMD in future, such two bits
1203          *     maybe managed by different MDS, should be adjusted then. */
1204         ldlm_policy_data_t       policy = {
1205                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1206                                                          MDS_INODELOCK_UPDATE }
1207                                  };
1208         int                      rc = 0;
1209         __u64                    flags = LDLM_FL_HAS_INTENT;
1210         ENTRY;
1211
1212         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1213                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1214                ldlm_it2str(it->it_op), it->it_flags);
1215
1216         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1217         req = mdc_intent_getattr_pack(exp, it, op_data);
1218         if (!req)
1219                 RETURN(-ENOMEM);
1220
1221         rc = mdc_enter_request(&obddev->u.cli);
1222         if (rc != 0) {
1223                 ptlrpc_req_finished(req);
1224                 RETURN(rc);
1225         }
1226
1227         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1228                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1229         if (rc < 0) {
1230                 mdc_exit_request(&obddev->u.cli);
1231                 ptlrpc_req_finished(req);
1232                 RETURN(rc);
1233         }
1234
1235         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1236         ga = ptlrpc_req_async_args(req);
1237         ga->ga_exp = exp;
1238         ga->ga_minfo = minfo;
1239         ga->ga_einfo = einfo;
1240
1241         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1242         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1243
1244         RETURN(0);
1245 }