Whamcloud - gitweb
8920c6c56328dcb99c201a1e6c526520c670e776
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_disposition(struct lookup_intent *it, int flag)
62 {
63         return it->d.lustre.it_disposition & flag;
64 }
65 EXPORT_SYMBOL(it_disposition);
66
67 void it_set_disposition(struct lookup_intent *it, int flag)
68 {
69         it->d.lustre.it_disposition |= flag;
70 }
71 EXPORT_SYMBOL(it_set_disposition);
72
73 void it_clear_disposition(struct lookup_intent *it, int flag)
74 {
75         it->d.lustre.it_disposition &= ~flag;
76 }
77 EXPORT_SYMBOL(it_clear_disposition);
78
79 int it_open_error(int phase, struct lookup_intent *it)
80 {
81         if (it_disposition(it, DISP_OPEN_LEASE)) {
82                 if (phase >= DISP_OPEN_LEASE)
83                         return it->d.lustre.it_status;
84                 else
85                         return 0;
86         }
87         if (it_disposition(it, DISP_OPEN_OPEN)) {
88                 if (phase >= DISP_OPEN_OPEN)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93
94         if (it_disposition(it, DISP_OPEN_CREATE)) {
95                 if (phase >= DISP_OPEN_CREATE)
96                         return it->d.lustre.it_status;
97                 else
98                         return 0;
99         }
100
101         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102                 if (phase >= DISP_LOOKUP_EXECD)
103                         return it->d.lustre.it_status;
104                 else
105                         return 0;
106         }
107
108         if (it_disposition(it, DISP_IT_EXECD)) {
109                 if (phase >= DISP_IT_EXECD)
110                         return it->d.lustre.it_status;
111                 else
112                         return 0;
113         }
114         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115                it->d.lustre.it_status);
116         LBUG();
117         return 0;
118 }
119 EXPORT_SYMBOL(it_open_error);
120
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123                       __u64 *bits)
124 {
125         struct ldlm_lock *lock;
126         struct inode *new_inode = data;
127         ENTRY;
128
129         if(bits)
130                 *bits = 0;
131
132         if (!*lockh)
133                 RETURN(0);
134
135         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136
137         LASSERT(lock != NULL);
138         lock_res_and_lock(lock);
139 #ifdef __KERNEL__
140         if (lock->l_resource->lr_lvb_inode &&
141             lock->l_resource->lr_lvb_inode != data) {
142                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143                 LASSERTF(old_inode->i_state & I_FREEING,
144                          "Found existing inode %p/%lu/%u state %lu in lock: "
145                          "setting data to %p/%lu/%u\n", old_inode,
146                          old_inode->i_ino, old_inode->i_generation,
147                          old_inode->i_state,
148                          new_inode, new_inode->i_ino, new_inode->i_generation);
149         }
150 #endif
151         lock->l_resource->lr_lvb_inode = new_inode;
152         if (bits)
153                 *bits = lock->l_policy_data.l_inodebits.bits;
154
155         unlock_res_and_lock(lock);
156         LDLM_LOCK_PUT(lock);
157
158         RETURN(0);
159 }
160
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162                            const struct lu_fid *fid, ldlm_type_t type,
163                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
164                            struct lustre_handle *lockh)
165 {
166         struct ldlm_res_id res_id;
167         ldlm_mode_t rc;
168         ENTRY;
169
170         fid_build_reg_res_name(fid, &res_id);
171         /* LU-4405: Clear bits not supported by server */
172         policy->l_inodebits.bits &= exp_connect_ibits(exp);
173         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
174                              &res_id, type, policy, mode, lockh, 0);
175         RETURN(rc);
176 }
177
178 int mdc_cancel_unused(struct obd_export *exp,
179                       const struct lu_fid *fid,
180                       ldlm_policy_data_t *policy,
181                       ldlm_mode_t mode,
182                       ldlm_cancel_flags_t flags,
183                       void *opaque)
184 {
185         struct ldlm_res_id res_id;
186         struct obd_device *obd = class_exp2obd(exp);
187         int rc;
188
189         ENTRY;
190
191         fid_build_reg_res_name(fid, &res_id);
192         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
193                                              policy, mode, flags, opaque);
194         RETURN(rc);
195 }
196
197 int mdc_null_inode(struct obd_export *exp,
198                    const struct lu_fid *fid)
199 {
200         struct ldlm_res_id res_id;
201         struct ldlm_resource *res;
202         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
203         ENTRY;
204
205         LASSERTF(ns != NULL, "no namespace passed\n");
206
207         fid_build_reg_res_name(fid, &res_id);
208
209         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
210         if(res == NULL)
211                 RETURN(0);
212
213         lock_res(res);
214         res->lr_lvb_inode = NULL;
215         unlock_res(res);
216
217         ldlm_resource_putref(res);
218         RETURN(0);
219 }
220
221 /* find any ldlm lock of the inode in mdc
222  * return 0    not find
223  *        1    find one
224  *      < 0    error */
225 int mdc_find_cbdata(struct obd_export *exp,
226                     const struct lu_fid *fid,
227                     ldlm_iterator_t it, void *data)
228 {
229         struct ldlm_res_id res_id;
230         int rc = 0;
231         ENTRY;
232
233         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
234         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
235                                    it, data);
236         if (rc == LDLM_ITER_STOP)
237                 RETURN(1);
238         else if (rc == LDLM_ITER_CONTINUE)
239                 RETURN(0);
240         RETURN(rc);
241 }
242
243 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
244 {
245         /* Don't hold error requests for replay. */
246         if (req->rq_replay) {
247                 spin_lock(&req->rq_lock);
248                 req->rq_replay = 0;
249                 spin_unlock(&req->rq_lock);
250         }
251         if (rc && req->rq_transno != 0) {
252                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
253                 LBUG();
254         }
255 }
256
257 /* Save a large LOV EA into the request buffer so that it is available
258  * for replay.  We don't do this in the initial request because the
259  * original request doesn't need this buffer (at most it sends just the
260  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
261  * buffer and may also be difficult to allocate and save a very large
262  * request buffer for each open. (bug 5707)
263  *
264  * OOM here may cause recovery failure if lmm is needed (only for the
265  * original open if the MDS crashed just when this client also OOM'd)
266  * but this is incredibly unlikely, and questionable whether the client
267  * could do MDS recovery under OOM anyways... */
268 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
269                                 struct mdt_body *body)
270 {
271         int     rc;
272
273         /* FIXME: remove this explicit offset. */
274         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
275                                         body->eadatasize);
276         if (rc) {
277                 CERROR("Can't enlarge segment %d size to %d\n",
278                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
279                 body->valid &= ~OBD_MD_FLEASIZE;
280                 body->eadatasize = 0;
281         }
282 }
283
284 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
285                                                    struct lookup_intent *it,
286                                                    struct md_op_data *op_data,
287                                                    void *lmm, int lmmsize,
288                                                    void *cb_data)
289 {
290         struct ptlrpc_request *req;
291         struct obd_device     *obddev = class_exp2obd(exp);
292         struct ldlm_intent    *lit;
293         CFS_LIST_HEAD(cancels);
294         int                    count = 0;
295         int                    mode;
296         int                    rc;
297         ENTRY;
298
299         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
300
301         /* XXX: openlock is not cancelled for cross-refs. */
302         /* If inode is known, cancel conflicting OPEN locks. */
303         if (fid_is_sane(&op_data->op_fid2)) {
304                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
305                         if (it->it_flags & FMODE_WRITE)
306                                 mode = LCK_EX;
307                         else
308                                 mode = LCK_PR;
309                 } else {
310                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
311                                 mode = LCK_CW;
312 #ifdef FMODE_EXEC
313                         else if (it->it_flags & FMODE_EXEC)
314                                 mode = LCK_PR;
315 #endif
316                         else
317                                 mode = LCK_CR;
318                 }
319                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
320                                                 &cancels, mode,
321                                                 MDS_INODELOCK_OPEN);
322         }
323
324         /* If CREATE, cancel parent's UPDATE lock. */
325         if (it->it_op & IT_CREAT)
326                 mode = LCK_EX;
327         else
328                 mode = LCK_CR;
329         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
330                                          &cancels, mode,
331                                          MDS_INODELOCK_UPDATE);
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
334                                    &RQF_LDLM_INTENT_OPEN);
335         if (req == NULL) {
336                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
337                 RETURN(ERR_PTR(-ENOMEM));
338         }
339
340         /* parent capability */
341         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
342         /* child capability, reserve the size according to parent capa, it will
343          * be filled after we get the reply */
344         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
345
346         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
347                              op_data->op_namelen + 1);
348         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
349                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
350
351         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
352         if (rc < 0) {
353                 ptlrpc_request_free(req);
354                 RETURN(ERR_PTR(rc));
355         }
356
357         spin_lock(&req->rq_lock);
358         req->rq_replay = req->rq_import->imp_replayable;
359         spin_unlock(&req->rq_lock);
360
361         /* pack the intent */
362         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363         lit->opc = (__u64)it->it_op;
364
365         /* pack the intended request */
366         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
367                       lmmsize);
368
369         /* for remote client, fetch remote perm for current user */
370         if (client_is_remote(exp))
371                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
372                                      sizeof(struct mdt_remote_perm));
373         ptlrpc_request_set_replen(req);
374         return req;
375 }
376
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379                          struct lookup_intent *it,
380                          struct md_op_data *op_data)
381 {
382         struct ptlrpc_request   *req;
383         struct ldlm_intent      *lit;
384         int                     rc, count = 0, maxdata;
385         CFS_LIST_HEAD(cancels);
386
387         ENTRY;
388
389         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
390                                         &RQF_LDLM_INTENT_GETXATTR);
391         if (req == NULL)
392                 RETURN(ERR_PTR(-ENOMEM));
393
394         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
395
396         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(ERR_PTR(rc));
400         }
401
402         /* pack the intent */
403         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404         lit->opc = IT_GETXATTR;
405
406         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
407
408         /* pack the intended request */
409         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
410                         op_data->op_valid, maxdata, -1, 0);
411
412         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
413                                 RCL_SERVER, maxdata);
414
415         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
416                                 RCL_SERVER, maxdata);
417
418         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
419                                 RCL_SERVER, maxdata);
420
421         ptlrpc_request_set_replen(req);
422
423         RETURN(req);
424 }
425
426 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
427                                                      struct lookup_intent *it,
428                                                      struct md_op_data *op_data)
429 {
430         struct ptlrpc_request *req;
431         struct obd_device     *obddev = class_exp2obd(exp);
432         struct ldlm_intent    *lit;
433         int                    rc;
434         ENTRY;
435
436         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437                                    &RQF_LDLM_INTENT_UNLINK);
438         if (req == NULL)
439                 RETURN(ERR_PTR(-ENOMEM));
440
441         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
442         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
443                              op_data->op_namelen + 1);
444
445         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
446         if (rc) {
447                 ptlrpc_request_free(req);
448                 RETURN(ERR_PTR(rc));
449         }
450
451         /* pack the intent */
452         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
453         lit->opc = (__u64)it->it_op;
454
455         /* pack the intended request */
456         mdc_unlink_pack(req, op_data);
457
458         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
459                              obddev->u.cli.cl_max_mds_easize);
460         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
461                              obddev->u.cli.cl_max_mds_cookiesize);
462         ptlrpc_request_set_replen(req);
463         RETURN(req);
464 }
465
466 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
467                                                       struct lookup_intent *it,
468                                                       struct md_op_data *op_data)
469 {
470         struct ptlrpc_request *req;
471         struct obd_device     *obddev = class_exp2obd(exp);
472         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
473                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
474                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
475                                        (client_is_remote(exp) ?
476                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
477         struct ldlm_intent    *lit;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
482                                    &RQF_LDLM_INTENT_GETATTR);
483         if (req == NULL)
484                 RETURN(ERR_PTR(-ENOMEM));
485
486         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
487         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
488                              op_data->op_namelen + 1);
489
490         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
491         if (rc) {
492                 ptlrpc_request_free(req);
493                 RETURN(ERR_PTR(rc));
494         }
495
496         /* pack the intent */
497         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498         lit->opc = (__u64)it->it_op;
499
500         /* pack the intended request */
501         mdc_getattr_pack(req, valid, it->it_flags, op_data,
502                          obddev->u.cli.cl_max_mds_easize);
503
504         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
505                              obddev->u.cli.cl_max_mds_easize);
506         if (client_is_remote(exp))
507                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
508                                      sizeof(struct mdt_remote_perm));
509         ptlrpc_request_set_replen(req);
510         RETURN(req);
511 }
512
513 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
514                                                      struct lookup_intent *it,
515                                                      struct md_op_data *unused)
516 {
517         struct obd_device     *obd = class_exp2obd(exp);
518         struct ptlrpc_request *req;
519         struct ldlm_intent    *lit;
520         struct layout_intent  *layout;
521         int rc;
522         ENTRY;
523
524         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
525                                 &RQF_LDLM_INTENT_LAYOUT);
526         if (req == NULL)
527                 RETURN(ERR_PTR(-ENOMEM));
528
529         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
530         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(ERR_PTR(rc));
534         }
535
536         /* pack the intent */
537         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
538         lit->opc = (__u64)it->it_op;
539
540         /* pack the layout intent request */
541         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
542         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
543          * set for replication */
544         layout->li_opc = LAYOUT_INTENT_ACCESS;
545
546         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
547                         obd->u.cli.cl_max_mds_easize);
548         ptlrpc_request_set_replen(req);
549         RETURN(req);
550 }
551
552 static struct ptlrpc_request *
553 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
554 {
555         struct ptlrpc_request *req;
556         int rc;
557         ENTRY;
558
559         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
560         if (req == NULL)
561                 RETURN(ERR_PTR(-ENOMEM));
562
563         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
564         if (rc) {
565                 ptlrpc_request_free(req);
566                 RETURN(ERR_PTR(rc));
567         }
568
569         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
570         ptlrpc_request_set_replen(req);
571         RETURN(req);
572 }
573
574 static int mdc_finish_enqueue(struct obd_export *exp,
575                               struct ptlrpc_request *req,
576                               struct ldlm_enqueue_info *einfo,
577                               struct lookup_intent *it,
578                               struct lustre_handle *lockh,
579                               int rc)
580 {
581         struct req_capsule  *pill = &req->rq_pill;
582         struct ldlm_request *lockreq;
583         struct ldlm_reply   *lockrep;
584         struct lustre_intent_data *intent = &it->d.lustre;
585         struct ldlm_lock    *lock;
586         void                *lvb_data = NULL;
587         int                  lvb_len = 0;
588         ENTRY;
589
590         LASSERT(rc >= 0);
591         /* Similarly, if we're going to replay this request, we don't want to
592          * actually get a lock, just perform the intent. */
593         if (req->rq_transno || req->rq_replay) {
594                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
595                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
596         }
597
598         if (rc == ELDLM_LOCK_ABORTED) {
599                 einfo->ei_mode = 0;
600                 memset(lockh, 0, sizeof(*lockh));
601                 rc = 0;
602         } else { /* rc = 0 */
603                 lock = ldlm_handle2lock(lockh);
604                 LASSERT(lock != NULL);
605
606                 /* If the server gave us back a different lock mode, we should
607                  * fix up our variables. */
608                 if (lock->l_req_mode != einfo->ei_mode) {
609                         ldlm_lock_addref(lockh, lock->l_req_mode);
610                         ldlm_lock_decref(lockh, einfo->ei_mode);
611                         einfo->ei_mode = lock->l_req_mode;
612                 }
613                 LDLM_LOCK_PUT(lock);
614         }
615
616         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
617         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
618
619         intent->it_disposition = (int)lockrep->lock_policy_res1;
620         intent->it_status = (int)lockrep->lock_policy_res2;
621         intent->it_lock_mode = einfo->ei_mode;
622         intent->it_lock_handle = lockh->cookie;
623         intent->it_data = req;
624
625         /* Technically speaking rq_transno must already be zero if
626          * it_status is in error, so the check is a bit redundant */
627         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
628                 mdc_clear_replay_flag(req, intent->it_status);
629
630         /* If we're doing an IT_OPEN which did not result in an actual
631          * successful open, then we need to remove the bit which saves
632          * this request for unconditional replay.
633          *
634          * It's important that we do this first!  Otherwise we might exit the
635          * function without doing so, and try to replay a failed create
636          * (bug 3440) */
637         if (it->it_op & IT_OPEN && req->rq_replay &&
638             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
639                 mdc_clear_replay_flag(req, intent->it_status);
640
641         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
642                   it->it_op, intent->it_disposition, intent->it_status);
643
644         /* We know what to expect, so we do any byte flipping required here */
645         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
646                 struct mdt_body *body;
647
648                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
649                 if (body == NULL) {
650                         CERROR ("Can't swab mdt_body\n");
651                         RETURN (-EPROTO);
652                 }
653
654                 if (it_disposition(it, DISP_OPEN_OPEN) &&
655                     !it_open_error(DISP_OPEN_OPEN, it)) {
656                         /*
657                          * If this is a successful OPEN request, we need to set
658                          * replay handler and data early, so that if replay
659                          * happens immediately after swabbing below, new reply
660                          * is swabbed by that handler correctly.
661                          */
662                         mdc_set_open_replay_data(NULL, NULL, it);
663                 }
664
665                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
666                         void *eadata;
667
668                         mdc_update_max_ea_from_body(exp, body);
669
670                         /*
671                          * The eadata is opaque; just check that it is there.
672                          * Eventually, obd_unpackmd() will check the contents.
673                          */
674                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
675                                                               body->eadatasize);
676                         if (eadata == NULL)
677                                 RETURN(-EPROTO);
678
679                         /* save lvb data and length in case this is for layout
680                          * lock */
681                         lvb_data = eadata;
682                         lvb_len = body->eadatasize;
683
684                         /*
685                          * We save the reply LOV EA in case we have to replay a
686                          * create for recovery.  If we didn't allocate a large
687                          * enough request buffer above we need to reallocate it
688                          * here to hold the actual LOV EA.
689                          *
690                          * To not save LOV EA if request is not going to replay
691                          * (for example error one).
692                          */
693                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
694                                 void *lmm;
695                                 if (req_capsule_get_size(pill, &RMF_EADATA,
696                                                          RCL_CLIENT) <
697                                     body->eadatasize)
698                                         mdc_realloc_openmsg(req, body);
699                                 else
700                                         req_capsule_shrink(pill, &RMF_EADATA,
701                                                            body->eadatasize,
702                                                            RCL_CLIENT);
703
704                                 req_capsule_set_size(pill, &RMF_EADATA,
705                                                      RCL_CLIENT,
706                                                      body->eadatasize);
707
708                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
709                                 if (lmm)
710                                         memcpy(lmm, eadata, body->eadatasize);
711                         }
712                 }
713
714                 if (body->valid & OBD_MD_FLRMTPERM) {
715                         struct mdt_remote_perm *perm;
716
717                         LASSERT(client_is_remote(exp));
718                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
719                                                 lustre_swab_mdt_remote_perm);
720                         if (perm == NULL)
721                                 RETURN(-EPROTO);
722                 }
723                 if (body->valid & OBD_MD_FLMDSCAPA) {
724                         struct lustre_capa *capa, *p;
725
726                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
727                         if (capa == NULL)
728                                 RETURN(-EPROTO);
729
730                         if (it->it_op & IT_OPEN) {
731                                 /* client fid capa will be checked in replay */
732                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
733                                 LASSERT(p);
734                                 *p = *capa;
735                         }
736                 }
737                 if (body->valid & OBD_MD_FLOSSCAPA) {
738                         struct lustre_capa *capa;
739
740                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
741                         if (capa == NULL)
742                                 RETURN(-EPROTO);
743                 }
744         } else if (it->it_op & IT_LAYOUT) {
745                 /* maybe the lock was granted right away and layout
746                  * is packed into RMF_DLM_LVB of req */
747                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
748                 if (lvb_len > 0) {
749                         lvb_data = req_capsule_server_sized_get(pill,
750                                                         &RMF_DLM_LVB, lvb_len);
751                         if (lvb_data == NULL)
752                                 RETURN(-EPROTO);
753                 }
754         }
755
756         /* fill in stripe data for layout lock */
757         lock = ldlm_handle2lock(lockh);
758         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
759                 void *lmm;
760
761                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
762                         ldlm_it2str(it->it_op), lvb_len);
763
764                 OBD_ALLOC_LARGE(lmm, lvb_len);
765                 if (lmm == NULL) {
766                         LDLM_LOCK_PUT(lock);
767                         RETURN(-ENOMEM);
768                 }
769                 memcpy(lmm, lvb_data, lvb_len);
770
771                 /* install lvb_data */
772                 lock_res_and_lock(lock);
773                 if (lock->l_lvb_data == NULL) {
774                         lock->l_lvb_type = LVB_T_LAYOUT;
775                         lock->l_lvb_data = lmm;
776                         lock->l_lvb_len = lvb_len;
777                         lmm = NULL;
778                 }
779                 unlock_res_and_lock(lock);
780                 if (lmm != NULL)
781                         OBD_FREE_LARGE(lmm, lvb_len);
782         }
783         if (lock != NULL)
784                 LDLM_LOCK_PUT(lock);
785
786         RETURN(rc);
787 }
788
789 /* We always reserve enough space in the reply packet for a stripe MD, because
790  * we don't know in advance the file type. */
791 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
792                 struct lookup_intent *it, struct md_op_data *op_data,
793                 struct lustre_handle *lockh, void *lmm, int lmmsize,
794                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
795 {
796         struct obd_device     *obddev = class_exp2obd(exp);
797         struct ptlrpc_request *req = NULL;
798         __u64                  flags, saved_flags = extra_lock_flags;
799         int                    rc;
800         struct ldlm_res_id res_id;
801         static const ldlm_policy_data_t lookup_policy =
802                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
803         static const ldlm_policy_data_t update_policy =
804                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
805         static const ldlm_policy_data_t layout_policy =
806                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
807         static const ldlm_policy_data_t getxattr_policy = {
808                               .l_inodebits = { MDS_INODELOCK_XATTR } };
809         ldlm_policy_data_t const *policy = &lookup_policy;
810         int                    generation, resends = 0;
811         struct ldlm_reply     *lockrep;
812         enum lvb_type          lvb_type = 0;
813         ENTRY;
814
815         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
816                  einfo->ei_type);
817
818         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
819
820         if (it) {
821                 saved_flags |= LDLM_FL_HAS_INTENT;
822                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
823                         policy = &update_policy;
824                 else if (it->it_op & IT_LAYOUT)
825                         policy = &layout_policy;
826                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
827                         policy = &getxattr_policy;
828         }
829
830         LASSERT(reqp == NULL);
831
832         generation = obddev->u.cli.cl_import->imp_generation;
833 resend:
834         flags = saved_flags;
835         if (!it) {
836                 /* The only way right now is FLOCK, in this case we hide flock
837                    policy as lmm, but lmmsize is 0 */
838                 LASSERT(lmm && lmmsize == 0);
839                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
840                          einfo->ei_type);
841                 policy = (ldlm_policy_data_t *)lmm;
842                 res_id.name[3] = LDLM_FLOCK;
843         } else if (it->it_op & IT_OPEN) {
844                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
845                                            einfo->ei_cbdata);
846                 policy = &update_policy;
847                 einfo->ei_cbdata = NULL;
848                 lmm = NULL;
849         } else if (it->it_op & IT_UNLINK) {
850                 req = mdc_intent_unlink_pack(exp, it, op_data);
851         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
852                 req = mdc_intent_getattr_pack(exp, it, op_data);
853         } else if (it->it_op & IT_READDIR) {
854                 req = mdc_enqueue_pack(exp, 0);
855         } else if (it->it_op & IT_LAYOUT) {
856                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
857                         RETURN(-EOPNOTSUPP);
858                 req = mdc_intent_layout_pack(exp, it, op_data);
859                 lvb_type = LVB_T_LAYOUT;
860         } else if (it->it_op & IT_GETXATTR) {
861                 req = mdc_intent_getxattr_pack(exp, it, op_data);
862         } else {
863                 LBUG();
864                 RETURN(-EINVAL);
865         }
866
867         if (IS_ERR(req))
868                 RETURN(PTR_ERR(req));
869
870         if (req != NULL && it && it->it_op & IT_CREAT)
871                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
872                  * retry logic */
873                 req->rq_no_retry_einprogress = 1;
874
875         if (resends) {
876                 req->rq_generation_set = 1;
877                 req->rq_import_generation = generation;
878                 req->rq_sent = cfs_time_current_sec() + resends;
879         }
880
881         /* It is important to obtain rpc_lock first (if applicable), so that
882          * threads that are serialised with rpc_lock are not polluting our
883          * rpcs in flight counter. We do not do flock request limiting, though*/
884         if (it) {
885                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
886                 rc = mdc_enter_request(&obddev->u.cli);
887                 if (rc != 0) {
888                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
889                         mdc_clear_replay_flag(req, 0);
890                         ptlrpc_req_finished(req);
891                         RETURN(rc);
892                 }
893         }
894
895         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
896                               0, lvb_type, lockh, 0);
897         if (!it) {
898                 /* For flock requests we immediatelly return without further
899                    delay and let caller deal with the rest, since rest of
900                    this function metadata processing makes no sense for flock
901                    requests anyway. But in case of problem during comms with
902                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
903                    can not rely on caller and this mainly for F_UNLCKs
904                    (explicits or automatically generated by Kernel to clean
905                    current FLocks upon exit) that can't be trashed */
906                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
907                     (einfo->ei_type == LDLM_FLOCK) &&
908                     (einfo->ei_mode == LCK_NL))
909                         goto resend;
910                 RETURN(rc);
911         }
912
913         mdc_exit_request(&obddev->u.cli);
914         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
915
916         if (rc < 0) {
917                 CERROR("ldlm_cli_enqueue: %d\n", rc);
918                 mdc_clear_replay_flag(req, rc);
919                 ptlrpc_req_finished(req);
920                 RETURN(rc);
921         }
922
923         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
924         LASSERT(lockrep != NULL);
925
926         lockrep->lock_policy_res2 =
927                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
928
929         /* Retry the create infinitely when we get -EINPROGRESS from
930          * server. This is required by the new quota design. */
931         if (it && it->it_op & IT_CREAT &&
932             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
933                 mdc_clear_replay_flag(req, rc);
934                 ptlrpc_req_finished(req);
935                 resends++;
936
937                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
938                        obddev->obd_name, resends, it->it_op,
939                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
940
941                 if (generation == obddev->u.cli.cl_import->imp_generation) {
942                         goto resend;
943                 } else {
944                         CDEBUG(D_HA, "resend cross eviction\n");
945                         RETURN(-EIO);
946                 }
947         }
948
949         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
950         if (rc < 0) {
951                 if (lustre_handle_is_used(lockh)) {
952                         ldlm_lock_decref(lockh, einfo->ei_mode);
953                         memset(lockh, 0, sizeof(*lockh));
954                 }
955                 ptlrpc_req_finished(req);
956         }
957         RETURN(rc);
958 }
959
960 static int mdc_finish_intent_lock(struct obd_export *exp,
961                                   struct ptlrpc_request *request,
962                                   struct md_op_data *op_data,
963                                   struct lookup_intent *it,
964                                   struct lustre_handle *lockh)
965 {
966         struct lustre_handle old_lock;
967         struct mdt_body *mdt_body;
968         struct ldlm_lock *lock;
969         int rc;
970         ENTRY;
971
972         LASSERT(request != NULL);
973         LASSERT(request != LP_POISON);
974         LASSERT(request->rq_repmsg != LP_POISON);
975
976         if (it->it_op & IT_READDIR)
977                 RETURN(0);
978
979         if (!it_disposition(it, DISP_IT_EXECD)) {
980                 /* The server failed before it even started executing the
981                  * intent, i.e. because it couldn't unpack the request. */
982                 LASSERT(it->d.lustre.it_status != 0);
983                 RETURN(it->d.lustre.it_status);
984         }
985         rc = it_open_error(DISP_IT_EXECD, it);
986         if (rc)
987                 RETURN(rc);
988
989         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
990         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
991
992         /* If we were revalidating a fid/name pair, mark the intent in
993          * case we fail and get called again from lookup */
994         if (fid_is_sane(&op_data->op_fid2) &&
995             it->it_create_mode & M_CHECK_STALE &&
996             it->it_op != IT_GETATTR) {
997                 /* Also: did we find the same inode? */
998                 /* sever can return one of two fids:
999                  * op_fid2 - new allocated fid - if file is created.
1000                  * op_fid3 - existent fid - if file only open.
1001                  * op_fid3 is saved in lmv_intent_open */
1002                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1003                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1004                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1005                                "\n", PFID(&op_data->op_fid2),
1006                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1007                         RETURN(-ESTALE);
1008                 }
1009         }
1010
1011         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1012         if (rc)
1013                 RETURN(rc);
1014
1015         /* keep requests around for the multiple phases of the call
1016          * this shows the DISP_XX must guarantee we make it into the call
1017          */
1018         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1019             it_disposition(it, DISP_OPEN_CREATE) &&
1020             !it_open_error(DISP_OPEN_CREATE, it)) {
1021                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1022                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1023         }
1024         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1025             it_disposition(it, DISP_OPEN_OPEN) &&
1026             !it_open_error(DISP_OPEN_OPEN, it)) {
1027                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1028                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1029                 /* BUG 11546 - eviction in the middle of open rpc processing */
1030                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1031         }
1032
1033         if (it->it_op & IT_CREAT) {
1034                 /* XXX this belongs in ll_create_it */
1035         } else if (it->it_op == IT_OPEN) {
1036                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1037         } else {
1038                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1039         }
1040
1041         /* If we already have a matching lock, then cancel the new
1042          * one.  We have to set the data here instead of in
1043          * mdc_enqueue, because we need to use the child's inode as
1044          * the l_ast_data to match, and that's not available until
1045          * intent_finish has performed the iget().) */
1046         lock = ldlm_handle2lock(lockh);
1047         if (lock) {
1048                 ldlm_policy_data_t policy = lock->l_policy_data;
1049                 LDLM_DEBUG(lock, "matching against this");
1050
1051                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1052                                          &lock->l_resource->lr_name),
1053                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1054                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1055                 LDLM_LOCK_PUT(lock);
1056
1057                 memcpy(&old_lock, lockh, sizeof(*lockh));
1058                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1059                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1060                         ldlm_lock_decref_and_cancel(lockh,
1061                                                     it->d.lustre.it_lock_mode);
1062                         memcpy(lockh, &old_lock, sizeof(old_lock));
1063                         it->d.lustre.it_lock_handle = lockh->cookie;
1064                 }
1065         }
1066         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1067                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1068                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1069         RETURN(rc);
1070 }
1071
1072 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1073                         struct lu_fid *fid, __u64 *bits)
1074 {
1075         /* We could just return 1 immediately, but since we should only
1076          * be called in revalidate_it if we already have a lock, let's
1077          * verify that. */
1078         struct ldlm_res_id res_id;
1079         struct lustre_handle lockh;
1080         ldlm_policy_data_t policy;
1081         ldlm_mode_t mode;
1082         ENTRY;
1083
1084         if (it->d.lustre.it_lock_handle) {
1085                 lockh.cookie = it->d.lustre.it_lock_handle;
1086                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1087         } else {
1088                 fid_build_reg_res_name(fid, &res_id);
1089                 switch (it->it_op) {
1090                 case IT_GETATTR:
1091                         /* File attributes are held under multiple bits:
1092                          * nlink is under lookup lock, size and times are
1093                          * under UPDATE lock and recently we've also got
1094                          * a separate permissions lock for owner/group/acl that
1095                          * were protected by lookup lock before.
1096                          * Getattr must provide all of that information,
1097                          * so we need to ensure we have all of those locks.
1098                          * Unfortunately, if the bits are split across multiple
1099                          * locks, there's no easy way to match all of them here,
1100                          * so an extra RPC would be performed to fetch all
1101                          * of those bits at once for now. */
1102                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1103                          * but for old MDTs (< 2.4), permission is covered
1104                          * by LOOKUP lock, so it needs to match all bits here.*/
1105                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1106                                                   MDS_INODELOCK_LOOKUP |
1107                                                   MDS_INODELOCK_PERM;
1108                         break;
1109                 case IT_READDIR:
1110                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1111                         break;
1112                 case IT_LAYOUT:
1113                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1114                         break;
1115                 default:
1116                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1117                         break;
1118                 }
1119
1120                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1121                                       LDLM_IBITS, &policy,
1122                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1123                                       &lockh);
1124         }
1125
1126         if (mode) {
1127                 it->d.lustre.it_lock_handle = lockh.cookie;
1128                 it->d.lustre.it_lock_mode = mode;
1129         } else {
1130                 it->d.lustre.it_lock_handle = 0;
1131                 it->d.lustre.it_lock_mode = 0;
1132         }
1133
1134         RETURN(!!mode);
1135 }
1136
1137 /*
1138  * This long block is all about fixing up the lock and request state
1139  * so that it is correct as of the moment _before_ the operation was
1140  * applied; that way, the VFS will think that everything is normal and
1141  * call Lustre's regular VFS methods.
1142  *
1143  * If we're performing a creation, that means that unless the creation
1144  * failed with EEXIST, we should fake up a negative dentry.
1145  *
1146  * For everything else, we want to lookup to succeed.
1147  *
1148  * One additional note: if CREATE or OPEN succeeded, we add an extra
1149  * reference to the request because we need to keep it around until
1150  * ll_create/ll_open gets called.
1151  *
1152  * The server will return to us, in it_disposition, an indication of
1153  * exactly what d.lustre.it_status refers to.
1154  *
1155  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1156  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1157  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1158  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1159  * was successful.
1160  *
1161  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1162  * child lookup.
1163  */
1164 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1165                     void *lmm, int lmmsize, struct lookup_intent *it,
1166                     int lookup_flags, struct ptlrpc_request **reqp,
1167                     ldlm_blocking_callback cb_blocking,
1168                     __u64 extra_lock_flags)
1169 {
1170         struct ldlm_enqueue_info einfo = {
1171                 .ei_type        = LDLM_IBITS,
1172                 .ei_mode        = it_to_lock_mode(it),
1173                 .ei_cb_bl       = cb_blocking,
1174                 .ei_cb_cp       = ldlm_completion_ast,
1175         };
1176         struct lustre_handle lockh;
1177         int rc = 0;
1178         ENTRY;
1179         LASSERT(it);
1180
1181         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1182                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1183                 op_data->op_name, PFID(&op_data->op_fid2),
1184                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1185                 it->it_flags);
1186
1187         lockh.cookie = 0;
1188         if (fid_is_sane(&op_data->op_fid2) &&
1189             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1190                 /* We could just return 1 immediately, but since we should only
1191                  * be called in revalidate_it if we already have a lock, let's
1192                  * verify that. */
1193                 it->d.lustre.it_lock_handle = 0;
1194                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1195                 /* Only return failure if it was not GETATTR by cfid
1196                    (from inode_revalidate) */
1197                 if (rc || op_data->op_namelen != 0)
1198                         RETURN(rc);
1199         }
1200
1201         /* For case if upper layer did not alloc fid, do it now. */
1202         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1203                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1204                 if (rc < 0) {
1205                         CERROR("Can't alloc new fid, rc %d\n", rc);
1206                         RETURN(rc);
1207                 }
1208         }
1209         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1210                          extra_lock_flags);
1211         if (rc < 0)
1212                 RETURN(rc);
1213
1214         *reqp = it->d.lustre.it_data;
1215         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1216         RETURN(rc);
1217 }
1218
1219 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1220                                               struct ptlrpc_request *req,
1221                                               void *args, int rc)
1222 {
1223         struct mdc_getattr_args  *ga = args;
1224         struct obd_export        *exp = ga->ga_exp;
1225         struct md_enqueue_info   *minfo = ga->ga_minfo;
1226         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1227         struct lookup_intent     *it;
1228         struct lustre_handle     *lockh;
1229         struct obd_device        *obddev;
1230         struct ldlm_reply        *lockrep;
1231         __u64                     flags = LDLM_FL_HAS_INTENT;
1232         ENTRY;
1233
1234         it    = &minfo->mi_it;
1235         lockh = &minfo->mi_lockh;
1236
1237         obddev = class_exp2obd(exp);
1238
1239         mdc_exit_request(&obddev->u.cli);
1240         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1241                 rc = -ETIMEDOUT;
1242
1243         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1244                                    &flags, NULL, 0, lockh, rc);
1245         if (rc < 0) {
1246                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1247                 mdc_clear_replay_flag(req, rc);
1248                 GOTO(out, rc);
1249         }
1250
1251         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1252         LASSERT(lockrep != NULL);
1253
1254         lockrep->lock_policy_res2 =
1255                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1256
1257         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1258         if (rc)
1259                 GOTO(out, rc);
1260
1261         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1262         EXIT;
1263
1264 out:
1265         OBD_FREE_PTR(einfo);
1266         minfo->mi_cb(req, minfo, rc);
1267         return 0;
1268 }
1269
1270 int mdc_intent_getattr_async(struct obd_export *exp,
1271                              struct md_enqueue_info *minfo,
1272                              struct ldlm_enqueue_info *einfo)
1273 {
1274         struct md_op_data       *op_data = &minfo->mi_data;
1275         struct lookup_intent    *it = &minfo->mi_it;
1276         struct ptlrpc_request   *req;
1277         struct mdc_getattr_args *ga;
1278         struct obd_device       *obddev = class_exp2obd(exp);
1279         struct ldlm_res_id       res_id;
1280         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1281          *     for statahead currently. Consider CMD in future, such two bits
1282          *     maybe managed by different MDS, should be adjusted then. */
1283         ldlm_policy_data_t       policy = {
1284                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1285                                                          MDS_INODELOCK_UPDATE }
1286                                  };
1287         int                      rc = 0;
1288         __u64                    flags = LDLM_FL_HAS_INTENT;
1289         ENTRY;
1290
1291         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1292                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1293                 ldlm_it2str(it->it_op), it->it_flags);
1294
1295         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1296         req = mdc_intent_getattr_pack(exp, it, op_data);
1297         if (IS_ERR(req))
1298                 RETURN(PTR_ERR(req));
1299
1300         rc = mdc_enter_request(&obddev->u.cli);
1301         if (rc != 0) {
1302                 ptlrpc_req_finished(req);
1303                 RETURN(rc);
1304         }
1305
1306         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1307                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1308         if (rc < 0) {
1309                 mdc_exit_request(&obddev->u.cli);
1310                 ptlrpc_req_finished(req);
1311                 RETURN(rc);
1312         }
1313
1314         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1315         ga = ptlrpc_req_async_args(req);
1316         ga->ga_exp = exp;
1317         ga->ga_minfo = minfo;
1318         ga->ga_einfo = einfo;
1319
1320         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1321         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1322
1323         RETURN(0);
1324 }