Whamcloud - gitweb
LU-2665 mdc: Keep resend FLocks
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         struct inode *new_inode = data;
122         ENTRY;
123
124         if(bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 RETURN(0);
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134 #ifdef __KERNEL__
135         if (lock->l_resource->lr_lvb_inode &&
136             lock->l_resource->lr_lvb_inode != data) {
137                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: "
140                          "setting data to %p/%lu/%u\n", old_inode,
141                          old_inode->i_ino, old_inode->i_generation,
142                          old_inode->i_state,
143                          new_inode, new_inode->i_ino, new_inode->i_generation);
144         }
145 #endif
146         lock->l_resource->lr_lvb_inode = new_inode;
147         if (bits)
148                 *bits = lock->l_policy_data.l_inodebits.bits;
149
150         unlock_res_and_lock(lock);
151         LDLM_LOCK_PUT(lock);
152
153         RETURN(0);
154 }
155
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157                            const struct lu_fid *fid, ldlm_type_t type,
158                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
159                            struct lustre_handle *lockh)
160 {
161         struct ldlm_res_id res_id;
162         ldlm_mode_t rc;
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167                              &res_id, type, policy, mode, lockh, 0);
168         RETURN(rc);
169 }
170
171 int mdc_cancel_unused(struct obd_export *exp,
172                       const struct lu_fid *fid,
173                       ldlm_policy_data_t *policy,
174                       ldlm_mode_t mode,
175                       ldlm_cancel_flags_t flags,
176                       void *opaque)
177 {
178         struct ldlm_res_id res_id;
179         struct obd_device *obd = class_exp2obd(exp);
180         int rc;
181
182         ENTRY;
183
184         fid_build_reg_res_name(fid, &res_id);
185         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186                                              policy, mode, flags, opaque);
187         RETURN(rc);
188 }
189
190 int mdc_null_inode(struct obd_export *exp,
191                    const struct lu_fid *fid)
192 {
193         struct ldlm_res_id res_id;
194         struct ldlm_resource *res;
195         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
196         ENTRY;
197
198         LASSERTF(ns != NULL, "no namespace passed\n");
199
200         fid_build_reg_res_name(fid, &res_id);
201
202         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203         if(res == NULL)
204                 RETURN(0);
205
206         lock_res(res);
207         res->lr_lvb_inode = NULL;
208         unlock_res(res);
209
210         ldlm_resource_putref(res);
211         RETURN(0);
212 }
213
214 /* find any ldlm lock of the inode in mdc
215  * return 0    not find
216  *        1    find one
217  *      < 0    error */
218 int mdc_find_cbdata(struct obd_export *exp,
219                     const struct lu_fid *fid,
220                     ldlm_iterator_t it, void *data)
221 {
222         struct ldlm_res_id res_id;
223         int rc = 0;
224         ENTRY;
225
226         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
228                                    it, data);
229         if (rc == LDLM_ITER_STOP)
230                 RETURN(1);
231         else if (rc == LDLM_ITER_CONTINUE)
232                 RETURN(0);
233         RETURN(rc);
234 }
235
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
237 {
238         /* Don't hold error requests for replay. */
239         if (req->rq_replay) {
240                 spin_lock(&req->rq_lock);
241                 req->rq_replay = 0;
242                 spin_unlock(&req->rq_lock);
243         }
244         if (rc && req->rq_transno != 0) {
245                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
246                 LBUG();
247         }
248 }
249
250 /* Save a large LOV EA into the request buffer so that it is available
251  * for replay.  We don't do this in the initial request because the
252  * original request doesn't need this buffer (at most it sends just the
253  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254  * buffer and may also be difficult to allocate and save a very large
255  * request buffer for each open. (bug 5707)
256  *
257  * OOM here may cause recovery failure if lmm is needed (only for the
258  * original open if the MDS crashed just when this client also OOM'd)
259  * but this is incredibly unlikely, and questionable whether the client
260  * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262                                 struct mdt_body *body)
263 {
264         int     rc;
265
266         /* FIXME: remove this explicit offset. */
267         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
268                                         body->eadatasize);
269         if (rc) {
270                 CERROR("Can't enlarge segment %d size to %d\n",
271                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
272                 body->valid &= ~OBD_MD_FLEASIZE;
273                 body->eadatasize = 0;
274         }
275 }
276
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278                                                    struct lookup_intent *it,
279                                                    struct md_op_data *op_data,
280                                                    void *lmm, int lmmsize,
281                                                    void *cb_data)
282 {
283         struct ptlrpc_request *req;
284         struct obd_device     *obddev = class_exp2obd(exp);
285         struct ldlm_intent    *lit;
286         CFS_LIST_HEAD(cancels);
287         int                    count = 0;
288         int                    mode;
289         int                    rc;
290         ENTRY;
291
292         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
293
294         /* XXX: openlock is not cancelled for cross-refs. */
295         /* If inode is known, cancel conflicting OPEN locks. */
296         if (fid_is_sane(&op_data->op_fid2)) {
297                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                         mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                 else if (it->it_flags & FMODE_EXEC)
301                         mode = LCK_PR;
302 #endif
303                 else
304                         mode = LCK_CR;
305                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306                                                 &cancels, mode,
307                                                 MDS_INODELOCK_OPEN);
308         }
309
310         /* If CREATE, cancel parent's UPDATE lock. */
311         if (it->it_op & IT_CREAT)
312                 mode = LCK_EX;
313         else
314                 mode = LCK_CR;
315         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
316                                          &cancels, mode,
317                                          MDS_INODELOCK_UPDATE);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320                                    &RQF_LDLM_INTENT_OPEN);
321         if (req == NULL) {
322                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323                 RETURN(ERR_PTR(-ENOMEM));
324         }
325
326         /* parent capability */
327         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328         /* child capability, reserve the size according to parent capa, it will
329          * be filled after we get the reply */
330         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
331
332         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333                              op_data->op_namelen + 1);
334         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
336
337         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 return NULL;
341         }
342
343         spin_lock(&req->rq_lock);
344         req->rq_replay = req->rq_import->imp_replayable;
345         spin_unlock(&req->rq_lock);
346
347         /* pack the intent */
348         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349         lit->opc = (__u64)it->it_op;
350
351         /* pack the intended request */
352         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
353                       lmmsize);
354
355         /* for remote client, fetch remote perm for current user */
356         if (client_is_remote(exp))
357                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358                                      sizeof(struct mdt_remote_perm));
359         ptlrpc_request_set_replen(req);
360         return req;
361 }
362
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364                                                      struct lookup_intent *it,
365                                                      struct md_op_data *op_data)
366 {
367         struct ptlrpc_request *req;
368         struct obd_device     *obddev = class_exp2obd(exp);
369         struct ldlm_intent    *lit;
370         int                    rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374                                    &RQF_LDLM_INTENT_UNLINK);
375         if (req == NULL)
376                 RETURN(ERR_PTR(-ENOMEM));
377
378         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380                              op_data->op_namelen + 1);
381
382         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(ERR_PTR(rc));
386         }
387
388         /* pack the intent */
389         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390         lit->opc = (__u64)it->it_op;
391
392         /* pack the intended request */
393         mdc_unlink_pack(req, op_data);
394
395         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396                              obddev->u.cli.cl_max_mds_easize);
397         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398                              obddev->u.cli.cl_max_mds_cookiesize);
399         ptlrpc_request_set_replen(req);
400         RETURN(req);
401 }
402
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404                                                       struct lookup_intent *it,
405                                                       struct md_op_data *op_data)
406 {
407         struct ptlrpc_request *req;
408         struct obd_device     *obddev = class_exp2obd(exp);
409         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412                                        (client_is_remote(exp) ?
413                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414         struct ldlm_intent    *lit;
415         int                    rc;
416         ENTRY;
417
418         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419                                    &RQF_LDLM_INTENT_GETATTR);
420         if (req == NULL)
421                 RETURN(ERR_PTR(-ENOMEM));
422
423         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425                              op_data->op_namelen + 1);
426
427         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(ERR_PTR(rc));
431         }
432
433         /* pack the intent */
434         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435         lit->opc = (__u64)it->it_op;
436
437         /* pack the intended request */
438         mdc_getattr_pack(req, valid, it->it_flags, op_data,
439                          obddev->u.cli.cl_max_mds_easize);
440
441         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442                              obddev->u.cli.cl_max_mds_easize);
443         if (client_is_remote(exp))
444                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445                                      sizeof(struct mdt_remote_perm));
446         ptlrpc_request_set_replen(req);
447         RETURN(req);
448 }
449
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451                                                      struct lookup_intent *it,
452                                                      struct md_op_data *unused)
453 {
454         struct obd_device     *obd = class_exp2obd(exp);
455         struct ptlrpc_request *req;
456         struct ldlm_intent    *lit;
457         struct layout_intent  *layout;
458         int rc;
459         ENTRY;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                 &RQF_LDLM_INTENT_LAYOUT);
463         if (req == NULL)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
468         if (rc) {
469                 ptlrpc_request_free(req);
470                 RETURN(ERR_PTR(rc));
471         }
472
473         /* pack the intent */
474         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475         lit->opc = (__u64)it->it_op;
476
477         /* pack the layout intent request */
478         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480          * set for replication */
481         layout->li_opc = LAYOUT_INTENT_ACCESS;
482
483         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484                         obd->u.cli.cl_max_mds_easize);
485         ptlrpc_request_set_replen(req);
486         RETURN(req);
487 }
488
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
491 {
492         struct ptlrpc_request *req;
493         int rc;
494         ENTRY;
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
497         if (req == NULL)
498                 RETURN(ERR_PTR(-ENOMEM));
499
500         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(ERR_PTR(rc));
504         }
505
506         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507         ptlrpc_request_set_replen(req);
508         RETURN(req);
509 }
510
511 static int mdc_finish_enqueue(struct obd_export *exp,
512                               struct ptlrpc_request *req,
513                               struct ldlm_enqueue_info *einfo,
514                               struct lookup_intent *it,
515                               struct lustre_handle *lockh,
516                               int rc)
517 {
518         struct req_capsule  *pill = &req->rq_pill;
519         struct ldlm_request *lockreq;
520         struct ldlm_reply   *lockrep;
521         struct lustre_intent_data *intent = &it->d.lustre;
522         struct ldlm_lock    *lock;
523         void                *lvb_data = NULL;
524         int                  lvb_len = 0;
525         ENTRY;
526
527         LASSERT(rc >= 0);
528         /* Similarly, if we're going to replay this request, we don't want to
529          * actually get a lock, just perform the intent. */
530         if (req->rq_transno || req->rq_replay) {
531                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
533         }
534
535         if (rc == ELDLM_LOCK_ABORTED) {
536                 einfo->ei_mode = 0;
537                 memset(lockh, 0, sizeof(*lockh));
538                 rc = 0;
539         } else { /* rc = 0 */
540                 lock = ldlm_handle2lock(lockh);
541                 LASSERT(lock != NULL);
542
543                 /* If the server gave us back a different lock mode, we should
544                  * fix up our variables. */
545                 if (lock->l_req_mode != einfo->ei_mode) {
546                         ldlm_lock_addref(lockh, lock->l_req_mode);
547                         ldlm_lock_decref(lockh, einfo->ei_mode);
548                         einfo->ei_mode = lock->l_req_mode;
549                 }
550                 LDLM_LOCK_PUT(lock);
551         }
552
553         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
555
556         intent->it_disposition = (int)lockrep->lock_policy_res1;
557         intent->it_status = (int)lockrep->lock_policy_res2;
558         intent->it_lock_mode = einfo->ei_mode;
559         intent->it_lock_handle = lockh->cookie;
560         intent->it_data = req;
561
562         /* Technically speaking rq_transno must already be zero if
563          * it_status is in error, so the check is a bit redundant */
564         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565                 mdc_clear_replay_flag(req, intent->it_status);
566
567         /* If we're doing an IT_OPEN which did not result in an actual
568          * successful open, then we need to remove the bit which saves
569          * this request for unconditional replay.
570          *
571          * It's important that we do this first!  Otherwise we might exit the
572          * function without doing so, and try to replay a failed create
573          * (bug 3440) */
574         if (it->it_op & IT_OPEN && req->rq_replay &&
575             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576                 mdc_clear_replay_flag(req, intent->it_status);
577
578         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579                   it->it_op, intent->it_disposition, intent->it_status);
580
581         /* We know what to expect, so we do any byte flipping required here */
582         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583                 struct mdt_body *body;
584
585                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
586                 if (body == NULL) {
587                         CERROR ("Can't swab mdt_body\n");
588                         RETURN (-EPROTO);
589                 }
590
591                 if (it_disposition(it, DISP_OPEN_OPEN) &&
592                     !it_open_error(DISP_OPEN_OPEN, it)) {
593                         /*
594                          * If this is a successful OPEN request, we need to set
595                          * replay handler and data early, so that if replay
596                          * happens immediately after swabbing below, new reply
597                          * is swabbed by that handler correctly.
598                          */
599                         mdc_set_open_replay_data(NULL, NULL, req);
600                 }
601
602                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
603                         void *eadata;
604
605                         mdc_update_max_ea_from_body(exp, body);
606
607                         /*
608                          * The eadata is opaque; just check that it is there.
609                          * Eventually, obd_unpackmd() will check the contents.
610                          */
611                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
612                                                               body->eadatasize);
613                         if (eadata == NULL)
614                                 RETURN(-EPROTO);
615
616                         /* save lvb data and length in case this is for layout
617                          * lock */
618                         lvb_data = eadata;
619                         lvb_len = body->eadatasize;
620
621                         /*
622                          * We save the reply LOV EA in case we have to replay a
623                          * create for recovery.  If we didn't allocate a large
624                          * enough request buffer above we need to reallocate it
625                          * here to hold the actual LOV EA.
626                          *
627                          * To not save LOV EA if request is not going to replay
628                          * (for example error one).
629                          */
630                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
631                                 void *lmm;
632                                 if (req_capsule_get_size(pill, &RMF_EADATA,
633                                                          RCL_CLIENT) <
634                                     body->eadatasize)
635                                         mdc_realloc_openmsg(req, body);
636                                 else
637                                         req_capsule_shrink(pill, &RMF_EADATA,
638                                                            body->eadatasize,
639                                                            RCL_CLIENT);
640
641                                 req_capsule_set_size(pill, &RMF_EADATA,
642                                                      RCL_CLIENT,
643                                                      body->eadatasize);
644
645                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
646                                 if (lmm)
647                                         memcpy(lmm, eadata, body->eadatasize);
648                         }
649                 }
650
651                 if (body->valid & OBD_MD_FLRMTPERM) {
652                         struct mdt_remote_perm *perm;
653
654                         LASSERT(client_is_remote(exp));
655                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656                                                 lustre_swab_mdt_remote_perm);
657                         if (perm == NULL)
658                                 RETURN(-EPROTO);
659                 }
660                 if (body->valid & OBD_MD_FLMDSCAPA) {
661                         struct lustre_capa *capa, *p;
662
663                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
664                         if (capa == NULL)
665                                 RETURN(-EPROTO);
666
667                         if (it->it_op & IT_OPEN) {
668                                 /* client fid capa will be checked in replay */
669                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
670                                 LASSERT(p);
671                                 *p = *capa;
672                         }
673                 }
674                 if (body->valid & OBD_MD_FLOSSCAPA) {
675                         struct lustre_capa *capa;
676
677                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
678                         if (capa == NULL)
679                                 RETURN(-EPROTO);
680                 }
681         } else if (it->it_op & IT_LAYOUT) {
682                 /* maybe the lock was granted right away and layout
683                  * is packed into RMF_DLM_LVB of req */
684                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
685                 if (lvb_len > 0) {
686                         lvb_data = req_capsule_server_sized_get(pill,
687                                                         &RMF_DLM_LVB, lvb_len);
688                         if (lvb_data == NULL)
689                                 RETURN(-EPROTO);
690                 }
691         }
692
693         /* fill in stripe data for layout lock */
694         lock = ldlm_handle2lock(lockh);
695         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
696                 void *lmm;
697
698                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699                         ldlm_it2str(it->it_op), lvb_len);
700
701                 OBD_ALLOC_LARGE(lmm, lvb_len);
702                 if (lmm == NULL) {
703                         LDLM_LOCK_PUT(lock);
704                         RETURN(-ENOMEM);
705                 }
706                 memcpy(lmm, lvb_data, lvb_len);
707
708                 /* install lvb_data */
709                 lock_res_and_lock(lock);
710                 if (lock->l_lvb_data == NULL) {
711                         lock->l_lvb_data = lmm;
712                         lock->l_lvb_len = lvb_len;
713                         lmm = NULL;
714                 }
715                 unlock_res_and_lock(lock);
716                 if (lmm != NULL)
717                         OBD_FREE_LARGE(lmm, lvb_len);
718         }
719         if (lock != NULL)
720                 LDLM_LOCK_PUT(lock);
721
722         RETURN(rc);
723 }
724
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726  * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728                 struct lookup_intent *it, struct md_op_data *op_data,
729                 struct lustre_handle *lockh, void *lmm, int lmmsize,
730                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
731 {
732         struct obd_device     *obddev = class_exp2obd(exp);
733         struct ptlrpc_request *req = NULL;
734         __u64                  flags, saved_flags = extra_lock_flags;
735         int                    rc;
736         struct ldlm_res_id res_id;
737         static const ldlm_policy_data_t lookup_policy =
738                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const ldlm_policy_data_t update_policy =
740                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const ldlm_policy_data_t layout_policy =
742                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         ldlm_policy_data_t const *policy = &lookup_policy;
744         int                    generation, resends = 0;
745         struct ldlm_reply     *lockrep;
746         enum lvb_type          lvb_type = 0;
747         ENTRY;
748
749         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750                  einfo->ei_type);
751
752         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
753
754         if (it) {
755                 saved_flags |= LDLM_FL_HAS_INTENT;
756                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757                         policy = &update_policy;
758                 else if (it->it_op & IT_LAYOUT)
759                         policy = &layout_policy;
760         }
761
762         LASSERT(reqp == NULL);
763
764         generation = obddev->u.cli.cl_import->imp_generation;
765 resend:
766         flags = saved_flags;
767         if (!it) {
768                 /* The only way right now is FLOCK, in this case we hide flock
769                    policy as lmm, but lmmsize is 0 */
770                 LASSERT(lmm && lmmsize == 0);
771                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
772                          einfo->ei_type);
773                 policy = (ldlm_policy_data_t *)lmm;
774                 res_id.name[3] = LDLM_FLOCK;
775         } else if (it->it_op & IT_OPEN) {
776                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
777                                            einfo->ei_cbdata);
778                 policy = &update_policy;
779                 einfo->ei_cbdata = NULL;
780                 lmm = NULL;
781         } else if (it->it_op & IT_UNLINK) {
782                 req = mdc_intent_unlink_pack(exp, it, op_data);
783         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784                 req = mdc_intent_getattr_pack(exp, it, op_data);
785         } else if (it->it_op & IT_READDIR) {
786                 req = mdc_enqueue_pack(exp, 0);
787         } else if (it->it_op & IT_LAYOUT) {
788                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
789                         RETURN(-EOPNOTSUPP);
790
791                 req = mdc_intent_layout_pack(exp, it, op_data);
792                 lvb_type = LVB_T_LAYOUT;
793         } else {
794                 LBUG();
795                 RETURN(-EINVAL);
796         }
797
798         if (IS_ERR(req))
799                 RETURN(PTR_ERR(req));
800
801         if (req != NULL && it && it->it_op & IT_CREAT)
802                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
803                  * retry logic */
804                 req->rq_no_retry_einprogress = 1;
805
806         if (resends) {
807                 req->rq_generation_set = 1;
808                 req->rq_import_generation = generation;
809                 req->rq_sent = cfs_time_current_sec() + resends;
810         }
811
812         /* It is important to obtain rpc_lock first (if applicable), so that
813          * threads that are serialised with rpc_lock are not polluting our
814          * rpcs in flight counter. We do not do flock request limiting, though*/
815         if (it) {
816                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817                 rc = mdc_enter_request(&obddev->u.cli);
818                 if (rc != 0) {
819                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820                         mdc_clear_replay_flag(req, 0);
821                         ptlrpc_req_finished(req);
822                         RETURN(rc);
823                 }
824         }
825
826         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827                               0, lvb_type, lockh, 0);
828         if (!it) {
829                 /* For flock requests we immediatelly return without further
830                    delay and let caller deal with the rest, since rest of
831                    this function metadata processing makes no sense for flock
832                    requests anyway. But in case of problem during comms with
833                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
834                    can not rely on caller and this mainly for F_UNLCKs
835                    (explicits or automatically generated by Kernel to clean
836                    current FLocks upon exit) that can't be trashed */
837                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
838                         goto resend;
839                 RETURN(rc);
840         }
841
842         mdc_exit_request(&obddev->u.cli);
843         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
844
845         if (rc < 0) {
846                 CERROR("ldlm_cli_enqueue: %d\n", rc);
847                 mdc_clear_replay_flag(req, rc);
848                 ptlrpc_req_finished(req);
849                 RETURN(rc);
850         }
851
852         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
853         LASSERT(lockrep != NULL);
854
855         /* Retry the create infinitely when we get -EINPROGRESS from
856          * server. This is required by the new quota design. */
857         if (it && it->it_op & IT_CREAT &&
858             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
859                 mdc_clear_replay_flag(req, rc);
860                 ptlrpc_req_finished(req);
861                 resends++;
862
863                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
864                        obddev->obd_name, resends, it->it_op,
865                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
866
867                 if (generation == obddev->u.cli.cl_import->imp_generation) {
868                         goto resend;
869                 } else {
870                         CDEBUG(D_HA, "resend cross eviction\n");
871                         RETURN(-EIO);
872                 }
873         }
874
875         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
876         if (rc < 0) {
877                 if (lustre_handle_is_used(lockh)) {
878                         ldlm_lock_decref(lockh, einfo->ei_mode);
879                         memset(lockh, 0, sizeof(*lockh));
880                 }
881                 ptlrpc_req_finished(req);
882         }
883         RETURN(rc);
884 }
885
886 static int mdc_finish_intent_lock(struct obd_export *exp,
887                                   struct ptlrpc_request *request,
888                                   struct md_op_data *op_data,
889                                   struct lookup_intent *it,
890                                   struct lustre_handle *lockh)
891 {
892         struct lustre_handle old_lock;
893         struct mdt_body *mdt_body;
894         struct ldlm_lock *lock;
895         int rc;
896
897
898         LASSERT(request != NULL);
899         LASSERT(request != LP_POISON);
900         LASSERT(request->rq_repmsg != LP_POISON);
901
902         if (!it_disposition(it, DISP_IT_EXECD)) {
903                 /* The server failed before it even started executing the
904                  * intent, i.e. because it couldn't unpack the request. */
905                 LASSERT(it->d.lustre.it_status != 0);
906                 RETURN(it->d.lustre.it_status);
907         }
908         rc = it_open_error(DISP_IT_EXECD, it);
909         if (rc)
910                 RETURN(rc);
911
912         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
913         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
914
915         /* If we were revalidating a fid/name pair, mark the intent in
916          * case we fail and get called again from lookup */
917         if (fid_is_sane(&op_data->op_fid2) &&
918             it->it_create_mode & M_CHECK_STALE &&
919             it->it_op != IT_GETATTR) {
920                 it_set_disposition(it, DISP_ENQ_COMPLETE);
921
922                 /* Also: did we find the same inode? */
923                 /* sever can return one of two fids:
924                  * op_fid2 - new allocated fid - if file is created.
925                  * op_fid3 - existent fid - if file only open.
926                  * op_fid3 is saved in lmv_intent_open */
927                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
928                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
929                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
930                                "\n", PFID(&op_data->op_fid2),
931                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
932                         RETURN(-ESTALE);
933                 }
934         }
935
936         rc = it_open_error(DISP_LOOKUP_EXECD, it);
937         if (rc)
938                 RETURN(rc);
939
940         /* keep requests around for the multiple phases of the call
941          * this shows the DISP_XX must guarantee we make it into the call
942          */
943         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
944             it_disposition(it, DISP_OPEN_CREATE) &&
945             !it_open_error(DISP_OPEN_CREATE, it)) {
946                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
947                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
948         }
949         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
950             it_disposition(it, DISP_OPEN_OPEN) &&
951             !it_open_error(DISP_OPEN_OPEN, it)) {
952                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
953                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
954                 /* BUG 11546 - eviction in the middle of open rpc processing */
955                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
956         }
957
958         if (it->it_op & IT_CREAT) {
959                 /* XXX this belongs in ll_create_it */
960         } else if (it->it_op == IT_OPEN) {
961                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
962         } else {
963                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
964         }
965
966         /* If we already have a matching lock, then cancel the new
967          * one.  We have to set the data here instead of in
968          * mdc_enqueue, because we need to use the child's inode as
969          * the l_ast_data to match, and that's not available until
970          * intent_finish has performed the iget().) */
971         lock = ldlm_handle2lock(lockh);
972         if (lock) {
973                 ldlm_policy_data_t policy = lock->l_policy_data;
974                 LDLM_DEBUG(lock, "matching against this");
975
976                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
977                                          &lock->l_resource->lr_name),
978                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
979                          (unsigned long)lock->l_resource->lr_name.name[0],
980                          (unsigned long)lock->l_resource->lr_name.name[1],
981                          (unsigned long)lock->l_resource->lr_name.name[2],
982                          (unsigned long)fid_seq(&mdt_body->fid1),
983                          (unsigned long)fid_oid(&mdt_body->fid1),
984                          (unsigned long)fid_ver(&mdt_body->fid1));
985                 LDLM_LOCK_PUT(lock);
986
987                 memcpy(&old_lock, lockh, sizeof(*lockh));
988                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
989                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
990                         ldlm_lock_decref_and_cancel(lockh,
991                                                     it->d.lustre.it_lock_mode);
992                         memcpy(lockh, &old_lock, sizeof(old_lock));
993                         it->d.lustre.it_lock_handle = lockh->cookie;
994                 }
995         }
996         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
997                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
998                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
999         RETURN(rc);
1000 }
1001
1002 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1003                         struct lu_fid *fid, __u64 *bits)
1004 {
1005         /* We could just return 1 immediately, but since we should only
1006          * be called in revalidate_it if we already have a lock, let's
1007          * verify that. */
1008         struct ldlm_res_id res_id;
1009         struct lustre_handle lockh;
1010         ldlm_policy_data_t policy;
1011         ldlm_mode_t mode;
1012         ENTRY;
1013
1014         if (it->d.lustre.it_lock_handle) {
1015                 lockh.cookie = it->d.lustre.it_lock_handle;
1016                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1017         } else {
1018                 fid_build_reg_res_name(fid, &res_id);
1019                 switch (it->it_op) {
1020                 case IT_GETATTR:
1021                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1022                         break;
1023                 case IT_LAYOUT:
1024                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1025                         break;
1026                 default:
1027                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1028                         break;
1029                 }
1030                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1031                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1032                                        LDLM_IBITS, &policy,
1033                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1034         }
1035
1036         if (mode) {
1037                 it->d.lustre.it_lock_handle = lockh.cookie;
1038                 it->d.lustre.it_lock_mode = mode;
1039         } else {
1040                 it->d.lustre.it_lock_handle = 0;
1041                 it->d.lustre.it_lock_mode = 0;
1042         }
1043
1044         RETURN(!!mode);
1045 }
1046
1047 /*
1048  * This long block is all about fixing up the lock and request state
1049  * so that it is correct as of the moment _before_ the operation was
1050  * applied; that way, the VFS will think that everything is normal and
1051  * call Lustre's regular VFS methods.
1052  *
1053  * If we're performing a creation, that means that unless the creation
1054  * failed with EEXIST, we should fake up a negative dentry.
1055  *
1056  * For everything else, we want to lookup to succeed.
1057  *
1058  * One additional note: if CREATE or OPEN succeeded, we add an extra
1059  * reference to the request because we need to keep it around until
1060  * ll_create/ll_open gets called.
1061  *
1062  * The server will return to us, in it_disposition, an indication of
1063  * exactly what d.lustre.it_status refers to.
1064  *
1065  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1066  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1067  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1068  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1069  * was successful.
1070  *
1071  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1072  * child lookup.
1073  */
1074 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1075                     void *lmm, int lmmsize, struct lookup_intent *it,
1076                     int lookup_flags, struct ptlrpc_request **reqp,
1077                     ldlm_blocking_callback cb_blocking,
1078                     __u64 extra_lock_flags)
1079 {
1080         struct lustre_handle lockh;
1081         int rc = 0;
1082         ENTRY;
1083         LASSERT(it);
1084
1085         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1086                ", intent: %s flags %#o\n", op_data->op_namelen,
1087                op_data->op_name, PFID(&op_data->op_fid2),
1088                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1089                it->it_flags);
1090
1091         lockh.cookie = 0;
1092         if (fid_is_sane(&op_data->op_fid2) &&
1093             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1094                 /* We could just return 1 immediately, but since we should only
1095                  * be called in revalidate_it if we already have a lock, let's
1096                  * verify that. */
1097                 it->d.lustre.it_lock_handle = 0;
1098                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1099                 /* Only return failure if it was not GETATTR by cfid
1100                    (from inode_revalidate) */
1101                 if (rc || op_data->op_namelen != 0)
1102                         RETURN(rc);
1103         }
1104
1105         /* lookup_it may be called only after revalidate_it has run, because
1106          * revalidate_it cannot return errors, only zero.  Returning zero causes
1107          * this call to lookup, which *can* return an error.
1108          *
1109          * We only want to execute the request associated with the intent one
1110          * time, however, so don't send the request again.  Instead, skip past
1111          * this and use the request from revalidate.  In this case, revalidate
1112          * never dropped its reference, so the refcounts are all OK */
1113         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1114                 struct ldlm_enqueue_info einfo =
1115                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1116                           ldlm_completion_ast, NULL, NULL, NULL };
1117
1118                 /* For case if upper layer did not alloc fid, do it now. */
1119                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1120                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1121                         if (rc < 0) {
1122                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1123                                 RETURN(rc);
1124                         }
1125                 }
1126                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1127                                  lmm, lmmsize, NULL, extra_lock_flags);
1128                 if (rc < 0)
1129                         RETURN(rc);
1130         } else if (!fid_is_sane(&op_data->op_fid2) ||
1131                    !(it->it_create_mode & M_CHECK_STALE)) {
1132                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1133                  * request referenced from this intent, saved for subsequent
1134                  * lookup.  This path is executed when we proceed to this
1135                  * lookup, so we clear DISP_ENQ_COMPLETE */
1136                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1137         }
1138         *reqp = it->d.lustre.it_data;
1139         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1140         RETURN(rc);
1141 }
1142
1143 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1144                                               struct ptlrpc_request *req,
1145                                               void *args, int rc)
1146 {
1147         struct mdc_getattr_args  *ga = args;
1148         struct obd_export        *exp = ga->ga_exp;
1149         struct md_enqueue_info   *minfo = ga->ga_minfo;
1150         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1151         struct lookup_intent     *it;
1152         struct lustre_handle     *lockh;
1153         struct obd_device        *obddev;
1154         __u64                     flags = LDLM_FL_HAS_INTENT;
1155         ENTRY;
1156
1157         it    = &minfo->mi_it;
1158         lockh = &minfo->mi_lockh;
1159
1160         obddev = class_exp2obd(exp);
1161
1162         mdc_exit_request(&obddev->u.cli);
1163         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1164                 rc = -ETIMEDOUT;
1165
1166         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1167                                    &flags, NULL, 0, lockh, rc);
1168         if (rc < 0) {
1169                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1170                 mdc_clear_replay_flag(req, rc);
1171                 GOTO(out, rc);
1172         }
1173
1174         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1175         if (rc)
1176                 GOTO(out, rc);
1177
1178         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1179         EXIT;
1180
1181 out:
1182         OBD_FREE_PTR(einfo);
1183         minfo->mi_cb(req, minfo, rc);
1184         return 0;
1185 }
1186
1187 int mdc_intent_getattr_async(struct obd_export *exp,
1188                              struct md_enqueue_info *minfo,
1189                              struct ldlm_enqueue_info *einfo)
1190 {
1191         struct md_op_data       *op_data = &minfo->mi_data;
1192         struct lookup_intent    *it = &minfo->mi_it;
1193         struct ptlrpc_request   *req;
1194         struct mdc_getattr_args *ga;
1195         struct obd_device       *obddev = class_exp2obd(exp);
1196         struct ldlm_res_id       res_id;
1197         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1198          *     for statahead currently. Consider CMD in future, such two bits
1199          *     maybe managed by different MDS, should be adjusted then. */
1200         ldlm_policy_data_t       policy = {
1201                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1202                                                          MDS_INODELOCK_UPDATE }
1203                                  };
1204         int                      rc = 0;
1205         __u64                    flags = LDLM_FL_HAS_INTENT;
1206         ENTRY;
1207
1208         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1209                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1210                ldlm_it2str(it->it_op), it->it_flags);
1211
1212         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1213         req = mdc_intent_getattr_pack(exp, it, op_data);
1214         if (!req)
1215                 RETURN(-ENOMEM);
1216
1217         rc = mdc_enter_request(&obddev->u.cli);
1218         if (rc != 0) {
1219                 ptlrpc_req_finished(req);
1220                 RETURN(rc);
1221         }
1222
1223         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1224                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1225         if (rc < 0) {
1226                 mdc_exit_request(&obddev->u.cli);
1227                 ptlrpc_req_finished(req);
1228                 RETURN(rc);
1229         }
1230
1231         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1232         ga = ptlrpc_req_async_args(req);
1233         ga->ga_exp = exp;
1234         ga->ga_minfo = minfo;
1235         ga->ga_einfo = einfo;
1236
1237         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1238         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1239
1240         RETURN(0);
1241 }