Whamcloud - gitweb
cc60d2e0f02211a159917707ccd8215f51e641f4
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         struct inode *new_inode = data;
122         ENTRY;
123
124         if(bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 RETURN(0);
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134 #ifdef __KERNEL__
135         if (lock->l_resource->lr_lvb_inode &&
136             lock->l_resource->lr_lvb_inode != data) {
137                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: "
140                          "setting data to %p/%lu/%u\n", old_inode,
141                          old_inode->i_ino, old_inode->i_generation,
142                          old_inode->i_state,
143                          new_inode, new_inode->i_ino, new_inode->i_generation);
144         }
145 #endif
146         lock->l_resource->lr_lvb_inode = new_inode;
147         if (bits)
148                 *bits = lock->l_policy_data.l_inodebits.bits;
149
150         unlock_res_and_lock(lock);
151         LDLM_LOCK_PUT(lock);
152
153         RETURN(0);
154 }
155
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157                            const struct lu_fid *fid, ldlm_type_t type,
158                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
159                            struct lustre_handle *lockh)
160 {
161         struct ldlm_res_id res_id;
162         ldlm_mode_t rc;
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167                              &res_id, type, policy, mode, lockh, 0);
168         RETURN(rc);
169 }
170
171 int mdc_cancel_unused(struct obd_export *exp,
172                       const struct lu_fid *fid,
173                       ldlm_policy_data_t *policy,
174                       ldlm_mode_t mode,
175                       ldlm_cancel_flags_t flags,
176                       void *opaque)
177 {
178         struct ldlm_res_id res_id;
179         struct obd_device *obd = class_exp2obd(exp);
180         int rc;
181
182         ENTRY;
183
184         fid_build_reg_res_name(fid, &res_id);
185         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186                                              policy, mode, flags, opaque);
187         RETURN(rc);
188 }
189
190 int mdc_null_inode(struct obd_export *exp,
191                    const struct lu_fid *fid)
192 {
193         struct ldlm_res_id res_id;
194         struct ldlm_resource *res;
195         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
196         ENTRY;
197
198         LASSERTF(ns != NULL, "no namespace passed\n");
199
200         fid_build_reg_res_name(fid, &res_id);
201
202         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203         if(res == NULL)
204                 RETURN(0);
205
206         lock_res(res);
207         res->lr_lvb_inode = NULL;
208         unlock_res(res);
209
210         ldlm_resource_putref(res);
211         RETURN(0);
212 }
213
214 /* find any ldlm lock of the inode in mdc
215  * return 0    not find
216  *        1    find one
217  *      < 0    error */
218 int mdc_find_cbdata(struct obd_export *exp,
219                     const struct lu_fid *fid,
220                     ldlm_iterator_t it, void *data)
221 {
222         struct ldlm_res_id res_id;
223         int rc = 0;
224         ENTRY;
225
226         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
228                                    it, data);
229         if (rc == LDLM_ITER_STOP)
230                 RETURN(1);
231         else if (rc == LDLM_ITER_CONTINUE)
232                 RETURN(0);
233         RETURN(rc);
234 }
235
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
237 {
238         /* Don't hold error requests for replay. */
239         if (req->rq_replay) {
240                 spin_lock(&req->rq_lock);
241                 req->rq_replay = 0;
242                 spin_unlock(&req->rq_lock);
243         }
244         if (rc && req->rq_transno != 0) {
245                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
246                 LBUG();
247         }
248 }
249
250 /* Save a large LOV EA into the request buffer so that it is available
251  * for replay.  We don't do this in the initial request because the
252  * original request doesn't need this buffer (at most it sends just the
253  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254  * buffer and may also be difficult to allocate and save a very large
255  * request buffer for each open. (bug 5707)
256  *
257  * OOM here may cause recovery failure if lmm is needed (only for the
258  * original open if the MDS crashed just when this client also OOM'd)
259  * but this is incredibly unlikely, and questionable whether the client
260  * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262                                 struct mdt_body *body)
263 {
264         int     rc;
265
266         /* FIXME: remove this explicit offset. */
267         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
268                                         body->eadatasize);
269         if (rc) {
270                 CERROR("Can't enlarge segment %d size to %d\n",
271                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
272                 body->valid &= ~OBD_MD_FLEASIZE;
273                 body->eadatasize = 0;
274         }
275 }
276
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278                                                    struct lookup_intent *it,
279                                                    struct md_op_data *op_data,
280                                                    void *lmm, int lmmsize,
281                                                    void *cb_data)
282 {
283         struct ptlrpc_request *req;
284         struct obd_device     *obddev = class_exp2obd(exp);
285         struct ldlm_intent    *lit;
286         CFS_LIST_HEAD(cancels);
287         int                    count = 0;
288         int                    mode;
289         int                    rc;
290         ENTRY;
291
292         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
293
294         /* XXX: openlock is not cancelled for cross-refs. */
295         /* If inode is known, cancel conflicting OPEN locks. */
296         if (fid_is_sane(&op_data->op_fid2)) {
297                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                         mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                 else if (it->it_flags & FMODE_EXEC)
301                         mode = LCK_PR;
302 #endif
303                 else
304                         mode = LCK_CR;
305                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306                                                 &cancels, mode,
307                                                 MDS_INODELOCK_OPEN);
308         }
309
310         /* If CREATE, cancel parent's UPDATE lock. */
311         if (it->it_op & IT_CREAT)
312                 mode = LCK_EX;
313         else
314                 mode = LCK_CR;
315         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
316                                          &cancels, mode,
317                                          MDS_INODELOCK_UPDATE);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320                                    &RQF_LDLM_INTENT_OPEN);
321         if (req == NULL) {
322                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323                 RETURN(ERR_PTR(-ENOMEM));
324         }
325
326         /* parent capability */
327         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328         /* child capability, reserve the size according to parent capa, it will
329          * be filled after we get the reply */
330         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
331
332         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333                              op_data->op_namelen + 1);
334         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
336
337         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 return NULL;
341         }
342
343         spin_lock(&req->rq_lock);
344         req->rq_replay = req->rq_import->imp_replayable;
345         spin_unlock(&req->rq_lock);
346
347         /* pack the intent */
348         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349         lit->opc = (__u64)it->it_op;
350
351         /* pack the intended request */
352         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
353                       lmmsize);
354
355         /* for remote client, fetch remote perm for current user */
356         if (client_is_remote(exp))
357                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358                                      sizeof(struct mdt_remote_perm));
359         ptlrpc_request_set_replen(req);
360         return req;
361 }
362
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364                                                      struct lookup_intent *it,
365                                                      struct md_op_data *op_data)
366 {
367         struct ptlrpc_request *req;
368         struct obd_device     *obddev = class_exp2obd(exp);
369         struct ldlm_intent    *lit;
370         int                    rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374                                    &RQF_LDLM_INTENT_UNLINK);
375         if (req == NULL)
376                 RETURN(ERR_PTR(-ENOMEM));
377
378         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380                              op_data->op_namelen + 1);
381
382         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(ERR_PTR(rc));
386         }
387
388         /* pack the intent */
389         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390         lit->opc = (__u64)it->it_op;
391
392         /* pack the intended request */
393         mdc_unlink_pack(req, op_data);
394
395         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396                              obddev->u.cli.cl_max_mds_easize);
397         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398                              obddev->u.cli.cl_max_mds_cookiesize);
399         ptlrpc_request_set_replen(req);
400         RETURN(req);
401 }
402
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404                                                       struct lookup_intent *it,
405                                                       struct md_op_data *op_data)
406 {
407         struct ptlrpc_request *req;
408         struct obd_device     *obddev = class_exp2obd(exp);
409         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412                                        (client_is_remote(exp) ?
413                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414         struct ldlm_intent    *lit;
415         int                    rc;
416         ENTRY;
417
418         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419                                    &RQF_LDLM_INTENT_GETATTR);
420         if (req == NULL)
421                 RETURN(ERR_PTR(-ENOMEM));
422
423         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425                              op_data->op_namelen + 1);
426
427         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(ERR_PTR(rc));
431         }
432
433         /* pack the intent */
434         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435         lit->opc = (__u64)it->it_op;
436
437         /* pack the intended request */
438         mdc_getattr_pack(req, valid, it->it_flags, op_data,
439                          obddev->u.cli.cl_max_mds_easize);
440
441         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442                              obddev->u.cli.cl_max_mds_easize);
443         if (client_is_remote(exp))
444                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445                                      sizeof(struct mdt_remote_perm));
446         ptlrpc_request_set_replen(req);
447         RETURN(req);
448 }
449
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451                                                      struct lookup_intent *it,
452                                                      struct md_op_data *unused)
453 {
454         struct obd_device     *obd = class_exp2obd(exp);
455         struct ptlrpc_request *req;
456         struct ldlm_intent    *lit;
457         struct layout_intent  *layout;
458         int rc;
459         ENTRY;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                 &RQF_LDLM_INTENT_LAYOUT);
463         if (req == NULL)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
468         if (rc) {
469                 ptlrpc_request_free(req);
470                 RETURN(ERR_PTR(rc));
471         }
472
473         /* pack the intent */
474         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475         lit->opc = (__u64)it->it_op;
476
477         /* pack the layout intent request */
478         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480          * set for replication */
481         layout->li_opc = LAYOUT_INTENT_ACCESS;
482
483         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484                         obd->u.cli.cl_max_mds_easize);
485         ptlrpc_request_set_replen(req);
486         RETURN(req);
487 }
488
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
491 {
492         struct ptlrpc_request *req;
493         int rc;
494         ENTRY;
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
497         if (req == NULL)
498                 RETURN(ERR_PTR(-ENOMEM));
499
500         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(ERR_PTR(rc));
504         }
505
506         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507         ptlrpc_request_set_replen(req);
508         RETURN(req);
509 }
510
511 static int mdc_finish_enqueue(struct obd_export *exp,
512                               struct ptlrpc_request *req,
513                               struct ldlm_enqueue_info *einfo,
514                               struct lookup_intent *it,
515                               struct lustre_handle *lockh,
516                               int rc)
517 {
518         struct req_capsule  *pill = &req->rq_pill;
519         struct ldlm_request *lockreq;
520         struct ldlm_reply   *lockrep;
521         struct lustre_intent_data *intent = &it->d.lustre;
522         struct ldlm_lock    *lock;
523         void                *lvb_data = NULL;
524         int                  lvb_len = 0;
525         ENTRY;
526
527         LASSERT(rc >= 0);
528         /* Similarly, if we're going to replay this request, we don't want to
529          * actually get a lock, just perform the intent. */
530         if (req->rq_transno || req->rq_replay) {
531                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
533         }
534
535         if (rc == ELDLM_LOCK_ABORTED) {
536                 einfo->ei_mode = 0;
537                 memset(lockh, 0, sizeof(*lockh));
538                 rc = 0;
539         } else { /* rc = 0 */
540                 lock = ldlm_handle2lock(lockh);
541                 LASSERT(lock != NULL);
542
543                 /* If the server gave us back a different lock mode, we should
544                  * fix up our variables. */
545                 if (lock->l_req_mode != einfo->ei_mode) {
546                         ldlm_lock_addref(lockh, lock->l_req_mode);
547                         ldlm_lock_decref(lockh, einfo->ei_mode);
548                         einfo->ei_mode = lock->l_req_mode;
549                 }
550                 LDLM_LOCK_PUT(lock);
551         }
552
553         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
555
556         intent->it_disposition = (int)lockrep->lock_policy_res1;
557         intent->it_status = (int)lockrep->lock_policy_res2;
558         intent->it_lock_mode = einfo->ei_mode;
559         intent->it_lock_handle = lockh->cookie;
560         intent->it_data = req;
561
562         /* Technically speaking rq_transno must already be zero if
563          * it_status is in error, so the check is a bit redundant */
564         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565                 mdc_clear_replay_flag(req, intent->it_status);
566
567         /* If we're doing an IT_OPEN which did not result in an actual
568          * successful open, then we need to remove the bit which saves
569          * this request for unconditional replay.
570          *
571          * It's important that we do this first!  Otherwise we might exit the
572          * function without doing so, and try to replay a failed create
573          * (bug 3440) */
574         if (it->it_op & IT_OPEN && req->rq_replay &&
575             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576                 mdc_clear_replay_flag(req, intent->it_status);
577
578         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579                   it->it_op, intent->it_disposition, intent->it_status);
580
581         /* We know what to expect, so we do any byte flipping required here */
582         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583                 struct mdt_body *body;
584
585                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
586                 if (body == NULL) {
587                         CERROR ("Can't swab mdt_body\n");
588                         RETURN (-EPROTO);
589                 }
590
591                 if (it_disposition(it, DISP_OPEN_OPEN) &&
592                     !it_open_error(DISP_OPEN_OPEN, it)) {
593                         /*
594                          * If this is a successful OPEN request, we need to set
595                          * replay handler and data early, so that if replay
596                          * happens immediately after swabbing below, new reply
597                          * is swabbed by that handler correctly.
598                          */
599                         mdc_set_open_replay_data(NULL, NULL, req);
600                 }
601
602                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
603                         void *eadata;
604
605                         mdc_update_max_ea_from_body(exp, body);
606
607                         /*
608                          * The eadata is opaque; just check that it is there.
609                          * Eventually, obd_unpackmd() will check the contents.
610                          */
611                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
612                                                               body->eadatasize);
613                         if (eadata == NULL)
614                                 RETURN(-EPROTO);
615
616                         /* save lvb data and length in case this is for layout
617                          * lock */
618                         lvb_data = eadata;
619                         lvb_len = body->eadatasize;
620
621                         /*
622                          * We save the reply LOV EA in case we have to replay a
623                          * create for recovery.  If we didn't allocate a large
624                          * enough request buffer above we need to reallocate it
625                          * here to hold the actual LOV EA.
626                          *
627                          * To not save LOV EA if request is not going to replay
628                          * (for example error one).
629                          */
630                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
631                                 void *lmm;
632                                 if (req_capsule_get_size(pill, &RMF_EADATA,
633                                                          RCL_CLIENT) <
634                                     body->eadatasize)
635                                         mdc_realloc_openmsg(req, body);
636                                 else
637                                         req_capsule_shrink(pill, &RMF_EADATA,
638                                                            body->eadatasize,
639                                                            RCL_CLIENT);
640
641                                 req_capsule_set_size(pill, &RMF_EADATA,
642                                                      RCL_CLIENT,
643                                                      body->eadatasize);
644
645                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
646                                 if (lmm)
647                                         memcpy(lmm, eadata, body->eadatasize);
648                         }
649                 }
650
651                 if (body->valid & OBD_MD_FLRMTPERM) {
652                         struct mdt_remote_perm *perm;
653
654                         LASSERT(client_is_remote(exp));
655                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656                                                 lustre_swab_mdt_remote_perm);
657                         if (perm == NULL)
658                                 RETURN(-EPROTO);
659                 }
660                 if (body->valid & OBD_MD_FLMDSCAPA) {
661                         struct lustre_capa *capa, *p;
662
663                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
664                         if (capa == NULL)
665                                 RETURN(-EPROTO);
666
667                         if (it->it_op & IT_OPEN) {
668                                 /* client fid capa will be checked in replay */
669                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
670                                 LASSERT(p);
671                                 *p = *capa;
672                         }
673                 }
674                 if (body->valid & OBD_MD_FLOSSCAPA) {
675                         struct lustre_capa *capa;
676
677                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
678                         if (capa == NULL)
679                                 RETURN(-EPROTO);
680                 }
681         } else if (it->it_op & IT_LAYOUT) {
682                 /* maybe the lock was granted right away and layout
683                  * is packed into RMF_DLM_LVB of req */
684                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
685                 if (lvb_len > 0) {
686                         lvb_data = req_capsule_server_sized_get(pill,
687                                                         &RMF_DLM_LVB, lvb_len);
688                         if (lvb_data == NULL)
689                                 RETURN(-EPROTO);
690                 }
691         }
692
693         /* fill in stripe data for layout lock */
694         lock = ldlm_handle2lock(lockh);
695         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
696                 void *lmm;
697
698                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699                         ldlm_it2str(it->it_op), lvb_len);
700
701                 OBD_ALLOC_LARGE(lmm, lvb_len);
702                 if (lmm == NULL) {
703                         LDLM_LOCK_PUT(lock);
704                         RETURN(-ENOMEM);
705                 }
706                 memcpy(lmm, lvb_data, lvb_len);
707
708                 /* install lvb_data */
709                 lock_res_and_lock(lock);
710                 if (lock->l_lvb_data == NULL) {
711                         lock->l_lvb_data = lmm;
712                         lock->l_lvb_len = lvb_len;
713                         lmm = NULL;
714                 }
715                 unlock_res_and_lock(lock);
716                 if (lmm != NULL)
717                         OBD_FREE_LARGE(lmm, lvb_len);
718         }
719         if (lock != NULL)
720                 LDLM_LOCK_PUT(lock);
721
722         RETURN(rc);
723 }
724
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726  * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728                 struct lookup_intent *it, struct md_op_data *op_data,
729                 struct lustre_handle *lockh, void *lmm, int lmmsize,
730                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
731 {
732         struct obd_device     *obddev = class_exp2obd(exp);
733         struct ptlrpc_request *req = NULL;
734         __u64                  flags, saved_flags = extra_lock_flags;
735         int                    rc;
736         struct ldlm_res_id res_id;
737         static const ldlm_policy_data_t lookup_policy =
738                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const ldlm_policy_data_t update_policy =
740                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const ldlm_policy_data_t layout_policy =
742                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         ldlm_policy_data_t const *policy = &lookup_policy;
744         int                    generation, resends = 0;
745         struct ldlm_reply     *lockrep;
746         enum lvb_type          lvb_type = 0;
747         ENTRY;
748
749         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750                  einfo->ei_type);
751
752         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
753
754         if (it) {
755                 saved_flags |= LDLM_FL_HAS_INTENT;
756                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757                         policy = &update_policy;
758                 else if (it->it_op & IT_LAYOUT)
759                         policy = &layout_policy;
760         }
761
762         LASSERT(reqp == NULL);
763
764         generation = obddev->u.cli.cl_import->imp_generation;
765 resend:
766         flags = saved_flags;
767         if (!it) {
768                 /* The only way right now is FLOCK, in this case we hide flock
769                    policy as lmm, but lmmsize is 0 */
770                 LASSERT(lmm && lmmsize == 0);
771                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
772                          einfo->ei_type);
773                 policy = (ldlm_policy_data_t *)lmm;
774                 res_id.name[3] = LDLM_FLOCK;
775         } else if (it->it_op & IT_OPEN) {
776                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
777                                            einfo->ei_cbdata);
778                 policy = &update_policy;
779                 einfo->ei_cbdata = NULL;
780                 lmm = NULL;
781         } else if (it->it_op & IT_UNLINK) {
782                 req = mdc_intent_unlink_pack(exp, it, op_data);
783         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784                 req = mdc_intent_getattr_pack(exp, it, op_data);
785         } else if (it->it_op & IT_READDIR) {
786                 req = mdc_enqueue_pack(exp, 0);
787         } else if (it->it_op & IT_LAYOUT) {
788                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
789                         RETURN(-EOPNOTSUPP);
790
791                 req = mdc_intent_layout_pack(exp, it, op_data);
792                 lvb_type = LVB_T_LAYOUT;
793         } else {
794                 LBUG();
795                 RETURN(-EINVAL);
796         }
797
798         if (IS_ERR(req))
799                 RETURN(PTR_ERR(req));
800
801         if (req != NULL && it && it->it_op & IT_CREAT)
802                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
803                  * retry logic */
804                 req->rq_no_retry_einprogress = 1;
805
806         if (resends) {
807                 req->rq_generation_set = 1;
808                 req->rq_import_generation = generation;
809                 req->rq_sent = cfs_time_current_sec() + resends;
810         }
811
812         /* It is important to obtain rpc_lock first (if applicable), so that
813          * threads that are serialised with rpc_lock are not polluting our
814          * rpcs in flight counter. We do not do flock request limiting, though*/
815         if (it) {
816                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817                 rc = mdc_enter_request(&obddev->u.cli);
818                 if (rc != 0) {
819                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820                         mdc_clear_replay_flag(req, 0);
821                         ptlrpc_req_finished(req);
822                         RETURN(rc);
823                 }
824         }
825
826         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827                               0, lvb_type, lockh, 0);
828         if (!it) {
829                 /* For flock requests we immediatelly return without further
830                    delay and let caller deal with the rest, since rest of
831                    this function metadata processing makes no sense for flock
832                    requests anyway. But in case of problem during comms with
833                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
834                    can not rely on caller and this mainly for F_UNLCKs
835                    (explicits or automatically generated by Kernel to clean
836                    current FLocks upon exit) that can't be trashed */
837                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
838                         goto resend;
839                 RETURN(rc);
840         }
841
842         mdc_exit_request(&obddev->u.cli);
843         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
844
845         if (rc < 0) {
846                 CERROR("ldlm_cli_enqueue: %d\n", rc);
847                 mdc_clear_replay_flag(req, rc);
848                 ptlrpc_req_finished(req);
849                 RETURN(rc);
850         }
851
852         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
853         LASSERT(lockrep != NULL);
854
855         lockrep->lock_policy_res2 =
856                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
857
858         /* Retry the create infinitely when we get -EINPROGRESS from
859          * server. This is required by the new quota design. */
860         if (it && it->it_op & IT_CREAT &&
861             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
862                 mdc_clear_replay_flag(req, rc);
863                 ptlrpc_req_finished(req);
864                 resends++;
865
866                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
867                        obddev->obd_name, resends, it->it_op,
868                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
869
870                 if (generation == obddev->u.cli.cl_import->imp_generation) {
871                         goto resend;
872                 } else {
873                         CDEBUG(D_HA, "resend cross eviction\n");
874                         RETURN(-EIO);
875                 }
876         }
877
878         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
879         if (rc < 0) {
880                 if (lustre_handle_is_used(lockh)) {
881                         ldlm_lock_decref(lockh, einfo->ei_mode);
882                         memset(lockh, 0, sizeof(*lockh));
883                 }
884                 ptlrpc_req_finished(req);
885         }
886         RETURN(rc);
887 }
888
889 static int mdc_finish_intent_lock(struct obd_export *exp,
890                                   struct ptlrpc_request *request,
891                                   struct md_op_data *op_data,
892                                   struct lookup_intent *it,
893                                   struct lustre_handle *lockh)
894 {
895         struct lustre_handle old_lock;
896         struct mdt_body *mdt_body;
897         struct ldlm_lock *lock;
898         int rc;
899         ENTRY;
900
901         LASSERT(request != NULL);
902         LASSERT(request != LP_POISON);
903         LASSERT(request->rq_repmsg != LP_POISON);
904
905         if (!it_disposition(it, DISP_IT_EXECD)) {
906                 /* The server failed before it even started executing the
907                  * intent, i.e. because it couldn't unpack the request. */
908                 LASSERT(it->d.lustre.it_status != 0);
909                 RETURN(it->d.lustre.it_status);
910         }
911         rc = it_open_error(DISP_IT_EXECD, it);
912         if (rc)
913                 RETURN(rc);
914
915         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
916         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
917
918         /* If we were revalidating a fid/name pair, mark the intent in
919          * case we fail and get called again from lookup */
920         if (fid_is_sane(&op_data->op_fid2) &&
921             it->it_create_mode & M_CHECK_STALE &&
922             it->it_op != IT_GETATTR) {
923                 it_set_disposition(it, DISP_ENQ_COMPLETE);
924
925                 /* Also: did we find the same inode? */
926                 /* sever can return one of two fids:
927                  * op_fid2 - new allocated fid - if file is created.
928                  * op_fid3 - existent fid - if file only open.
929                  * op_fid3 is saved in lmv_intent_open */
930                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
931                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
932                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
933                                "\n", PFID(&op_data->op_fid2),
934                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
935                         RETURN(-ESTALE);
936                 }
937         }
938
939         rc = it_open_error(DISP_LOOKUP_EXECD, it);
940         if (rc)
941                 RETURN(rc);
942
943         /* keep requests around for the multiple phases of the call
944          * this shows the DISP_XX must guarantee we make it into the call
945          */
946         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
947             it_disposition(it, DISP_OPEN_CREATE) &&
948             !it_open_error(DISP_OPEN_CREATE, it)) {
949                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
950                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
951         }
952         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
953             it_disposition(it, DISP_OPEN_OPEN) &&
954             !it_open_error(DISP_OPEN_OPEN, it)) {
955                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
956                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
957                 /* BUG 11546 - eviction in the middle of open rpc processing */
958                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
959         }
960
961         if (it->it_op & IT_CREAT) {
962                 /* XXX this belongs in ll_create_it */
963         } else if (it->it_op == IT_OPEN) {
964                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
965         } else {
966                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
967         }
968
969         /* If we already have a matching lock, then cancel the new
970          * one.  We have to set the data here instead of in
971          * mdc_enqueue, because we need to use the child's inode as
972          * the l_ast_data to match, and that's not available until
973          * intent_finish has performed the iget().) */
974         lock = ldlm_handle2lock(lockh);
975         if (lock) {
976                 ldlm_policy_data_t policy = lock->l_policy_data;
977                 LDLM_DEBUG(lock, "matching against this");
978
979                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
980                                          &lock->l_resource->lr_name),
981                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
982                          (unsigned long)lock->l_resource->lr_name.name[0],
983                          (unsigned long)lock->l_resource->lr_name.name[1],
984                          (unsigned long)lock->l_resource->lr_name.name[2],
985                          (unsigned long)fid_seq(&mdt_body->fid1),
986                          (unsigned long)fid_oid(&mdt_body->fid1),
987                          (unsigned long)fid_ver(&mdt_body->fid1));
988                 LDLM_LOCK_PUT(lock);
989
990                 memcpy(&old_lock, lockh, sizeof(*lockh));
991                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
992                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
993                         ldlm_lock_decref_and_cancel(lockh,
994                                                     it->d.lustre.it_lock_mode);
995                         memcpy(lockh, &old_lock, sizeof(old_lock));
996                         it->d.lustre.it_lock_handle = lockh->cookie;
997                 }
998         }
999         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1000                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1001                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1002         RETURN(rc);
1003 }
1004
1005 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1006                         struct lu_fid *fid, __u64 *bits)
1007 {
1008         /* We could just return 1 immediately, but since we should only
1009          * be called in revalidate_it if we already have a lock, let's
1010          * verify that. */
1011         struct ldlm_res_id res_id;
1012         struct lustre_handle lockh;
1013         ldlm_policy_data_t policy;
1014         ldlm_mode_t mode;
1015         ENTRY;
1016
1017         if (it->d.lustre.it_lock_handle) {
1018                 lockh.cookie = it->d.lustre.it_lock_handle;
1019                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1020         } else {
1021                 fid_build_reg_res_name(fid, &res_id);
1022                 switch (it->it_op) {
1023                 case IT_GETATTR:
1024                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1025                         break;
1026                 case IT_LAYOUT:
1027                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1028                         break;
1029                 default:
1030                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1031                         break;
1032                 }
1033                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1034                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1035                                        LDLM_IBITS, &policy,
1036                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1037         }
1038
1039         if (mode) {
1040                 it->d.lustre.it_lock_handle = lockh.cookie;
1041                 it->d.lustre.it_lock_mode = mode;
1042         } else {
1043                 it->d.lustre.it_lock_handle = 0;
1044                 it->d.lustre.it_lock_mode = 0;
1045         }
1046
1047         RETURN(!!mode);
1048 }
1049
1050 /*
1051  * This long block is all about fixing up the lock and request state
1052  * so that it is correct as of the moment _before_ the operation was
1053  * applied; that way, the VFS will think that everything is normal and
1054  * call Lustre's regular VFS methods.
1055  *
1056  * If we're performing a creation, that means that unless the creation
1057  * failed with EEXIST, we should fake up a negative dentry.
1058  *
1059  * For everything else, we want to lookup to succeed.
1060  *
1061  * One additional note: if CREATE or OPEN succeeded, we add an extra
1062  * reference to the request because we need to keep it around until
1063  * ll_create/ll_open gets called.
1064  *
1065  * The server will return to us, in it_disposition, an indication of
1066  * exactly what d.lustre.it_status refers to.
1067  *
1068  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1069  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1070  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1071  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1072  * was successful.
1073  *
1074  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1075  * child lookup.
1076  */
1077 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1078                     void *lmm, int lmmsize, struct lookup_intent *it,
1079                     int lookup_flags, struct ptlrpc_request **reqp,
1080                     ldlm_blocking_callback cb_blocking,
1081                     __u64 extra_lock_flags)
1082 {
1083         struct lustre_handle lockh;
1084         int rc = 0;
1085         ENTRY;
1086         LASSERT(it);
1087
1088         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1089                ", intent: %s flags %#o\n", op_data->op_namelen,
1090                op_data->op_name, PFID(&op_data->op_fid2),
1091                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1092                it->it_flags);
1093
1094         lockh.cookie = 0;
1095         if (fid_is_sane(&op_data->op_fid2) &&
1096             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1097                 /* We could just return 1 immediately, but since we should only
1098                  * be called in revalidate_it if we already have a lock, let's
1099                  * verify that. */
1100                 it->d.lustre.it_lock_handle = 0;
1101                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1102                 /* Only return failure if it was not GETATTR by cfid
1103                    (from inode_revalidate) */
1104                 if (rc || op_data->op_namelen != 0)
1105                         RETURN(rc);
1106         }
1107
1108         /* lookup_it may be called only after revalidate_it has run, because
1109          * revalidate_it cannot return errors, only zero.  Returning zero causes
1110          * this call to lookup, which *can* return an error.
1111          *
1112          * We only want to execute the request associated with the intent one
1113          * time, however, so don't send the request again.  Instead, skip past
1114          * this and use the request from revalidate.  In this case, revalidate
1115          * never dropped its reference, so the refcounts are all OK */
1116         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1117                 struct ldlm_enqueue_info einfo = {
1118                         .ei_type        = LDLM_IBITS,
1119                         .ei_mode        = it_to_lock_mode(it),
1120                         .ei_cb_bl       = cb_blocking,
1121                         .ei_cb_cp       = ldlm_completion_ast,
1122                 };
1123
1124                 /* For case if upper layer did not alloc fid, do it now. */
1125                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1126                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1127                         if (rc < 0) {
1128                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1129                                 RETURN(rc);
1130                         }
1131                 }
1132                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1133                                  lmm, lmmsize, NULL, extra_lock_flags);
1134                 if (rc < 0)
1135                         RETURN(rc);
1136         } else if (!fid_is_sane(&op_data->op_fid2) ||
1137                    !(it->it_create_mode & M_CHECK_STALE)) {
1138                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1139                  * request referenced from this intent, saved for subsequent
1140                  * lookup.  This path is executed when we proceed to this
1141                  * lookup, so we clear DISP_ENQ_COMPLETE */
1142                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1143         }
1144         *reqp = it->d.lustre.it_data;
1145         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1146         RETURN(rc);
1147 }
1148
1149 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1150                                               struct ptlrpc_request *req,
1151                                               void *args, int rc)
1152 {
1153         struct mdc_getattr_args  *ga = args;
1154         struct obd_export        *exp = ga->ga_exp;
1155         struct md_enqueue_info   *minfo = ga->ga_minfo;
1156         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1157         struct lookup_intent     *it;
1158         struct lustre_handle     *lockh;
1159         struct obd_device        *obddev;
1160         struct ldlm_reply        *lockrep;
1161         __u64                     flags = LDLM_FL_HAS_INTENT;
1162         ENTRY;
1163
1164         it    = &minfo->mi_it;
1165         lockh = &minfo->mi_lockh;
1166
1167         obddev = class_exp2obd(exp);
1168
1169         mdc_exit_request(&obddev->u.cli);
1170         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1171                 rc = -ETIMEDOUT;
1172
1173         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1174                                    &flags, NULL, 0, lockh, rc);
1175         if (rc < 0) {
1176                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1177                 mdc_clear_replay_flag(req, rc);
1178                 GOTO(out, rc);
1179         }
1180
1181         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1182         LASSERT(lockrep != NULL);
1183
1184         lockrep->lock_policy_res2 =
1185                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1186
1187         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1192         EXIT;
1193
1194 out:
1195         OBD_FREE_PTR(einfo);
1196         minfo->mi_cb(req, minfo, rc);
1197         return 0;
1198 }
1199
1200 int mdc_intent_getattr_async(struct obd_export *exp,
1201                              struct md_enqueue_info *minfo,
1202                              struct ldlm_enqueue_info *einfo)
1203 {
1204         struct md_op_data       *op_data = &minfo->mi_data;
1205         struct lookup_intent    *it = &minfo->mi_it;
1206         struct ptlrpc_request   *req;
1207         struct mdc_getattr_args *ga;
1208         struct obd_device       *obddev = class_exp2obd(exp);
1209         struct ldlm_res_id       res_id;
1210         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1211          *     for statahead currently. Consider CMD in future, such two bits
1212          *     maybe managed by different MDS, should be adjusted then. */
1213         ldlm_policy_data_t       policy = {
1214                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1215                                                          MDS_INODELOCK_UPDATE }
1216                                  };
1217         int                      rc = 0;
1218         __u64                    flags = LDLM_FL_HAS_INTENT;
1219         ENTRY;
1220
1221         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1222                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1223                ldlm_it2str(it->it_op), it->it_flags);
1224
1225         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1226         req = mdc_intent_getattr_pack(exp, it, op_data);
1227         if (!req)
1228                 RETURN(-ENOMEM);
1229
1230         rc = mdc_enter_request(&obddev->u.cli);
1231         if (rc != 0) {
1232                 ptlrpc_req_finished(req);
1233                 RETURN(rc);
1234         }
1235
1236         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1237                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1238         if (rc < 0) {
1239                 mdc_exit_request(&obddev->u.cli);
1240                 ptlrpc_req_finished(req);
1241                 RETURN(rc);
1242         }
1243
1244         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1245         ga = ptlrpc_req_async_args(req);
1246         ga->ga_exp = exp;
1247         ga->ga_minfo = minfo;
1248         ga->ga_einfo = einfo;
1249
1250         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1251         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1252
1253         RETURN(0);
1254 }