Whamcloud - gitweb
LU-3529 lod: create striped directory
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_disposition(struct lookup_intent *it, int flag)
62 {
63         return it->d.lustre.it_disposition & flag;
64 }
65 EXPORT_SYMBOL(it_disposition);
66
67 void it_set_disposition(struct lookup_intent *it, int flag)
68 {
69         it->d.lustre.it_disposition |= flag;
70 }
71 EXPORT_SYMBOL(it_set_disposition);
72
73 void it_clear_disposition(struct lookup_intent *it, int flag)
74 {
75         it->d.lustre.it_disposition &= ~flag;
76 }
77 EXPORT_SYMBOL(it_clear_disposition);
78
79 int it_open_error(int phase, struct lookup_intent *it)
80 {
81         if (it_disposition(it, DISP_OPEN_LEASE)) {
82                 if (phase >= DISP_OPEN_LEASE)
83                         return it->d.lustre.it_status;
84                 else
85                         return 0;
86         }
87         if (it_disposition(it, DISP_OPEN_OPEN)) {
88                 if (phase >= DISP_OPEN_OPEN)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93
94         if (it_disposition(it, DISP_OPEN_CREATE)) {
95                 if (phase >= DISP_OPEN_CREATE)
96                         return it->d.lustre.it_status;
97                 else
98                         return 0;
99         }
100
101         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102                 if (phase >= DISP_LOOKUP_EXECD)
103                         return it->d.lustre.it_status;
104                 else
105                         return 0;
106         }
107
108         if (it_disposition(it, DISP_IT_EXECD)) {
109                 if (phase >= DISP_IT_EXECD)
110                         return it->d.lustre.it_status;
111                 else
112                         return 0;
113         }
114         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115                it->d.lustre.it_status);
116         LBUG();
117         return 0;
118 }
119 EXPORT_SYMBOL(it_open_error);
120
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123                       __u64 *bits)
124 {
125         struct ldlm_lock *lock;
126         struct inode *new_inode = data;
127         ENTRY;
128
129         if(bits)
130                 *bits = 0;
131
132         if (!*lockh)
133                 RETURN(0);
134
135         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136
137         LASSERT(lock != NULL);
138         lock_res_and_lock(lock);
139 #ifdef __KERNEL__
140         if (lock->l_resource->lr_lvb_inode &&
141             lock->l_resource->lr_lvb_inode != data) {
142                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143                 LASSERTF(old_inode->i_state & I_FREEING,
144                          "Found existing inode %p/%lu/%u state %lu in lock: "
145                          "setting data to %p/%lu/%u\n", old_inode,
146                          old_inode->i_ino, old_inode->i_generation,
147                          old_inode->i_state,
148                          new_inode, new_inode->i_ino, new_inode->i_generation);
149         }
150 #endif
151         lock->l_resource->lr_lvb_inode = new_inode;
152         if (bits)
153                 *bits = lock->l_policy_data.l_inodebits.bits;
154
155         unlock_res_and_lock(lock);
156         LDLM_LOCK_PUT(lock);
157
158         RETURN(0);
159 }
160
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162                            const struct lu_fid *fid, ldlm_type_t type,
163                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
164                            struct lustre_handle *lockh)
165 {
166         struct ldlm_res_id res_id;
167         ldlm_mode_t rc;
168         ENTRY;
169
170         fid_build_reg_res_name(fid, &res_id);
171         /* LU-4405: Clear bits not supported by server */
172         policy->l_inodebits.bits &= exp_connect_ibits(exp);
173         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
174                              &res_id, type, policy, mode, lockh, 0);
175         RETURN(rc);
176 }
177
178 int mdc_cancel_unused(struct obd_export *exp,
179                       const struct lu_fid *fid,
180                       ldlm_policy_data_t *policy,
181                       ldlm_mode_t mode,
182                       ldlm_cancel_flags_t flags,
183                       void *opaque)
184 {
185         struct ldlm_res_id res_id;
186         struct obd_device *obd = class_exp2obd(exp);
187         int rc;
188
189         ENTRY;
190
191         fid_build_reg_res_name(fid, &res_id);
192         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
193                                              policy, mode, flags, opaque);
194         RETURN(rc);
195 }
196
197 int mdc_null_inode(struct obd_export *exp,
198                    const struct lu_fid *fid)
199 {
200         struct ldlm_res_id res_id;
201         struct ldlm_resource *res;
202         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
203         ENTRY;
204
205         LASSERTF(ns != NULL, "no namespace passed\n");
206
207         fid_build_reg_res_name(fid, &res_id);
208
209         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
210         if(res == NULL)
211                 RETURN(0);
212
213         lock_res(res);
214         res->lr_lvb_inode = NULL;
215         unlock_res(res);
216
217         ldlm_resource_putref(res);
218         RETURN(0);
219 }
220
221 /* find any ldlm lock of the inode in mdc
222  * return 0    not find
223  *        1    find one
224  *      < 0    error */
225 int mdc_find_cbdata(struct obd_export *exp,
226                     const struct lu_fid *fid,
227                     ldlm_iterator_t it, void *data)
228 {
229         struct ldlm_res_id res_id;
230         int rc = 0;
231         ENTRY;
232
233         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
234         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
235                                    it, data);
236         if (rc == LDLM_ITER_STOP)
237                 RETURN(1);
238         else if (rc == LDLM_ITER_CONTINUE)
239                 RETURN(0);
240         RETURN(rc);
241 }
242
243 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
244 {
245         /* Don't hold error requests for replay. */
246         if (req->rq_replay) {
247                 spin_lock(&req->rq_lock);
248                 req->rq_replay = 0;
249                 spin_unlock(&req->rq_lock);
250         }
251         if (rc && req->rq_transno != 0) {
252                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
253                 LBUG();
254         }
255 }
256
257 /* Save a large LOV EA into the request buffer so that it is available
258  * for replay.  We don't do this in the initial request because the
259  * original request doesn't need this buffer (at most it sends just the
260  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
261  * buffer and may also be difficult to allocate and save a very large
262  * request buffer for each open. (bug 5707)
263  *
264  * OOM here may cause recovery failure if lmm is needed (only for the
265  * original open if the MDS crashed just when this client also OOM'd)
266  * but this is incredibly unlikely, and questionable whether the client
267  * could do MDS recovery under OOM anyways... */
268 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
269                                 struct mdt_body *body)
270 {
271         int     rc;
272
273         /* FIXME: remove this explicit offset. */
274         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
275                                         body->eadatasize);
276         if (rc) {
277                 CERROR("Can't enlarge segment %d size to %d\n",
278                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
279                 body->valid &= ~OBD_MD_FLEASIZE;
280                 body->eadatasize = 0;
281         }
282 }
283
284 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
285                                                    struct lookup_intent *it,
286                                                    struct md_op_data *op_data,
287                                                    void *lmm, int lmmsize,
288                                                    void *cb_data)
289 {
290         struct ptlrpc_request *req;
291         struct obd_device     *obddev = class_exp2obd(exp);
292         struct ldlm_intent    *lit;
293         CFS_LIST_HEAD(cancels);
294         int                    count = 0;
295         int                    mode;
296         int                    rc;
297         ENTRY;
298
299         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
300
301         /* XXX: openlock is not cancelled for cross-refs. */
302         /* If inode is known, cancel conflicting OPEN locks. */
303         if (fid_is_sane(&op_data->op_fid2)) {
304                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
305                         if (it->it_flags & FMODE_WRITE)
306                                 mode = LCK_EX;
307                         else
308                                 mode = LCK_PR;
309                 } else {
310                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
311                                 mode = LCK_CW;
312 #ifdef FMODE_EXEC
313                         else if (it->it_flags & FMODE_EXEC)
314                                 mode = LCK_PR;
315 #endif
316                         else
317                                 mode = LCK_CR;
318                 }
319                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
320                                                 &cancels, mode,
321                                                 MDS_INODELOCK_OPEN);
322         }
323
324         /* If CREATE, cancel parent's UPDATE lock. */
325         if (it->it_op & IT_CREAT)
326                 mode = LCK_EX;
327         else
328                 mode = LCK_CR;
329         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
330                                          &cancels, mode,
331                                          MDS_INODELOCK_UPDATE);
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
334                                    &RQF_LDLM_INTENT_OPEN);
335         if (req == NULL) {
336                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
337                 RETURN(ERR_PTR(-ENOMEM));
338         }
339
340         /* parent capability */
341         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
342         /* child capability, reserve the size according to parent capa, it will
343          * be filled after we get the reply */
344         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
345
346         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
347                              op_data->op_namelen + 1);
348         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
349                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
350
351         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
352         if (rc < 0) {
353                 ptlrpc_request_free(req);
354                 RETURN(ERR_PTR(rc));
355         }
356
357         spin_lock(&req->rq_lock);
358         req->rq_replay = req->rq_import->imp_replayable;
359         spin_unlock(&req->rq_lock);
360
361         /* pack the intent */
362         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363         lit->opc = (__u64)it->it_op;
364
365         /* pack the intended request */
366         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
367                       lmmsize);
368
369         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
370                              obddev->u.cli.cl_max_mds_easize);
371
372         /* for remote client, fetch remote perm for current user */
373         if (client_is_remote(exp))
374                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
375                                      sizeof(struct mdt_remote_perm));
376         ptlrpc_request_set_replen(req);
377         return req;
378 }
379
380 static struct ptlrpc_request *
381 mdc_intent_getxattr_pack(struct obd_export *exp,
382                          struct lookup_intent *it,
383                          struct md_op_data *op_data)
384 {
385         struct ptlrpc_request   *req;
386         struct ldlm_intent      *lit;
387         int                     rc, count = 0, maxdata;
388         CFS_LIST_HEAD(cancels);
389
390         ENTRY;
391
392         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
393                                         &RQF_LDLM_INTENT_GETXATTR);
394         if (req == NULL)
395                 RETURN(ERR_PTR(-ENOMEM));
396
397         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
398
399         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
400         if (rc) {
401                 ptlrpc_request_free(req);
402                 RETURN(ERR_PTR(rc));
403         }
404
405         /* pack the intent */
406         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
407         lit->opc = IT_GETXATTR;
408
409         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
410
411         /* pack the intended request */
412         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
413                         op_data->op_valid, maxdata, -1, 0);
414
415         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
416                                 RCL_SERVER, maxdata);
417
418         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
419                                 RCL_SERVER, maxdata);
420
421         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
422                                 RCL_SERVER, maxdata);
423
424         ptlrpc_request_set_replen(req);
425
426         RETURN(req);
427 }
428
429 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
430                                                      struct lookup_intent *it,
431                                                      struct md_op_data *op_data)
432 {
433         struct ptlrpc_request *req;
434         struct obd_device     *obddev = class_exp2obd(exp);
435         struct ldlm_intent    *lit;
436         int                    rc;
437         ENTRY;
438
439         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
440                                    &RQF_LDLM_INTENT_UNLINK);
441         if (req == NULL)
442                 RETURN(ERR_PTR(-ENOMEM));
443
444         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
445         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
446                              op_data->op_namelen + 1);
447
448         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 RETURN(ERR_PTR(rc));
452         }
453
454         /* pack the intent */
455         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
456         lit->opc = (__u64)it->it_op;
457
458         /* pack the intended request */
459         mdc_unlink_pack(req, op_data);
460
461         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
462                              obddev->u.cli.cl_max_mds_easize);
463         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
464                              obddev->u.cli.cl_max_mds_cookiesize);
465         ptlrpc_request_set_replen(req);
466         RETURN(req);
467 }
468
469 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
470                                                       struct lookup_intent *it,
471                                                       struct md_op_data *op_data)
472 {
473         struct ptlrpc_request *req;
474         struct obd_device     *obddev = class_exp2obd(exp);
475         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
476                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
477                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
478                                        (client_is_remote(exp) ?
479                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
480         struct ldlm_intent    *lit;
481         int                    rc;
482         ENTRY;
483
484         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
485                                    &RQF_LDLM_INTENT_GETATTR);
486         if (req == NULL)
487                 RETURN(ERR_PTR(-ENOMEM));
488
489         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
490         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
491                              op_data->op_namelen + 1);
492
493         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
494         if (rc) {
495                 ptlrpc_request_free(req);
496                 RETURN(ERR_PTR(rc));
497         }
498
499         /* pack the intent */
500         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
501         lit->opc = (__u64)it->it_op;
502
503         /* pack the intended request */
504         mdc_getattr_pack(req, valid, it->it_flags, op_data,
505                          obddev->u.cli.cl_max_mds_easize);
506
507         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
508                              obddev->u.cli.cl_max_mds_easize);
509         if (client_is_remote(exp))
510                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
511                                      sizeof(struct mdt_remote_perm));
512         ptlrpc_request_set_replen(req);
513         RETURN(req);
514 }
515
516 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
517                                                      struct lookup_intent *it,
518                                                      struct md_op_data *unused)
519 {
520         struct obd_device     *obd = class_exp2obd(exp);
521         struct ptlrpc_request *req;
522         struct ldlm_intent    *lit;
523         struct layout_intent  *layout;
524         int rc;
525         ENTRY;
526
527         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
528                                 &RQF_LDLM_INTENT_LAYOUT);
529         if (req == NULL)
530                 RETURN(ERR_PTR(-ENOMEM));
531
532         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
533         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(ERR_PTR(rc));
537         }
538
539         /* pack the intent */
540         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
541         lit->opc = (__u64)it->it_op;
542
543         /* pack the layout intent request */
544         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
545         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
546          * set for replication */
547         layout->li_opc = LAYOUT_INTENT_ACCESS;
548
549         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
550                         obd->u.cli.cl_max_mds_easize);
551         ptlrpc_request_set_replen(req);
552         RETURN(req);
553 }
554
555 static struct ptlrpc_request *
556 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
557 {
558         struct ptlrpc_request *req;
559         int rc;
560         ENTRY;
561
562         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
563         if (req == NULL)
564                 RETURN(ERR_PTR(-ENOMEM));
565
566         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
567         if (rc) {
568                 ptlrpc_request_free(req);
569                 RETURN(ERR_PTR(rc));
570         }
571
572         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
573         ptlrpc_request_set_replen(req);
574         RETURN(req);
575 }
576
577 static int mdc_finish_enqueue(struct obd_export *exp,
578                               struct ptlrpc_request *req,
579                               struct ldlm_enqueue_info *einfo,
580                               struct lookup_intent *it,
581                               struct lustre_handle *lockh,
582                               int rc)
583 {
584         struct req_capsule  *pill = &req->rq_pill;
585         struct ldlm_request *lockreq;
586         struct ldlm_reply   *lockrep;
587         struct lustre_intent_data *intent = &it->d.lustre;
588         struct ldlm_lock    *lock;
589         void                *lvb_data = NULL;
590         int                  lvb_len = 0;
591         ENTRY;
592
593         LASSERT(rc >= 0);
594         /* Similarly, if we're going to replay this request, we don't want to
595          * actually get a lock, just perform the intent. */
596         if (req->rq_transno || req->rq_replay) {
597                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
598                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
599         }
600
601         if (rc == ELDLM_LOCK_ABORTED) {
602                 einfo->ei_mode = 0;
603                 memset(lockh, 0, sizeof(*lockh));
604                 rc = 0;
605         } else { /* rc = 0 */
606                 lock = ldlm_handle2lock(lockh);
607                 LASSERT(lock != NULL);
608
609                 /* If the server gave us back a different lock mode, we should
610                  * fix up our variables. */
611                 if (lock->l_req_mode != einfo->ei_mode) {
612                         ldlm_lock_addref(lockh, lock->l_req_mode);
613                         ldlm_lock_decref(lockh, einfo->ei_mode);
614                         einfo->ei_mode = lock->l_req_mode;
615                 }
616                 LDLM_LOCK_PUT(lock);
617         }
618
619         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
620         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
621
622         intent->it_disposition = (int)lockrep->lock_policy_res1;
623         intent->it_status = (int)lockrep->lock_policy_res2;
624         intent->it_lock_mode = einfo->ei_mode;
625         intent->it_lock_handle = lockh->cookie;
626         intent->it_data = req;
627
628         /* Technically speaking rq_transno must already be zero if
629          * it_status is in error, so the check is a bit redundant */
630         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
631                 mdc_clear_replay_flag(req, intent->it_status);
632
633         /* If we're doing an IT_OPEN which did not result in an actual
634          * successful open, then we need to remove the bit which saves
635          * this request for unconditional replay.
636          *
637          * It's important that we do this first!  Otherwise we might exit the
638          * function without doing so, and try to replay a failed create
639          * (bug 3440) */
640         if (it->it_op & IT_OPEN && req->rq_replay &&
641             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
642                 mdc_clear_replay_flag(req, intent->it_status);
643
644         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
645                   it->it_op, intent->it_disposition, intent->it_status);
646
647         /* We know what to expect, so we do any byte flipping required here */
648         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
649                 struct mdt_body *body;
650
651                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
652                 if (body == NULL) {
653                         CERROR ("Can't swab mdt_body\n");
654                         RETURN (-EPROTO);
655                 }
656
657                 if (it_disposition(it, DISP_OPEN_OPEN) &&
658                     !it_open_error(DISP_OPEN_OPEN, it)) {
659                         /*
660                          * If this is a successful OPEN request, we need to set
661                          * replay handler and data early, so that if replay
662                          * happens immediately after swabbing below, new reply
663                          * is swabbed by that handler correctly.
664                          */
665                         mdc_set_open_replay_data(NULL, NULL, it);
666                 }
667
668                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
669                         void *eadata;
670
671                         mdc_update_max_ea_from_body(exp, body);
672
673                         /*
674                          * The eadata is opaque; just check that it is there.
675                          * Eventually, obd_unpackmd() will check the contents.
676                          */
677                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
678                                                               body->eadatasize);
679                         if (eadata == NULL)
680                                 RETURN(-EPROTO);
681
682                         /* save lvb data and length in case this is for layout
683                          * lock */
684                         lvb_data = eadata;
685                         lvb_len = body->eadatasize;
686
687                         /*
688                          * We save the reply LOV EA in case we have to replay a
689                          * create for recovery.  If we didn't allocate a large
690                          * enough request buffer above we need to reallocate it
691                          * here to hold the actual LOV EA.
692                          *
693                          * To not save LOV EA if request is not going to replay
694                          * (for example error one).
695                          */
696                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
697                                 void *lmm;
698                                 if (req_capsule_get_size(pill, &RMF_EADATA,
699                                                          RCL_CLIENT) <
700                                     body->eadatasize)
701                                         mdc_realloc_openmsg(req, body);
702                                 else
703                                         req_capsule_shrink(pill, &RMF_EADATA,
704                                                            body->eadatasize,
705                                                            RCL_CLIENT);
706
707                                 req_capsule_set_size(pill, &RMF_EADATA,
708                                                      RCL_CLIENT,
709                                                      body->eadatasize);
710
711                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
712                                 if (lmm)
713                                         memcpy(lmm, eadata, body->eadatasize);
714                         }
715                 }
716
717                 if (body->valid & OBD_MD_FLRMTPERM) {
718                         struct mdt_remote_perm *perm;
719
720                         LASSERT(client_is_remote(exp));
721                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
722                                                 lustre_swab_mdt_remote_perm);
723                         if (perm == NULL)
724                                 RETURN(-EPROTO);
725                 }
726                 if (body->valid & OBD_MD_FLMDSCAPA) {
727                         struct lustre_capa *capa, *p;
728
729                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
730                         if (capa == NULL)
731                                 RETURN(-EPROTO);
732
733                         if (it->it_op & IT_OPEN) {
734                                 /* client fid capa will be checked in replay */
735                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
736                                 LASSERT(p);
737                                 *p = *capa;
738                         }
739                 }
740                 if (body->valid & OBD_MD_FLOSSCAPA) {
741                         struct lustre_capa *capa;
742
743                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
744                         if (capa == NULL)
745                                 RETURN(-EPROTO);
746                 }
747         } else if (it->it_op & IT_LAYOUT) {
748                 /* maybe the lock was granted right away and layout
749                  * is packed into RMF_DLM_LVB of req */
750                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
751                 if (lvb_len > 0) {
752                         lvb_data = req_capsule_server_sized_get(pill,
753                                                         &RMF_DLM_LVB, lvb_len);
754                         if (lvb_data == NULL)
755                                 RETURN(-EPROTO);
756                 }
757         }
758
759         /* fill in stripe data for layout lock */
760         lock = ldlm_handle2lock(lockh);
761         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
762                 void *lmm;
763
764                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
765                         ldlm_it2str(it->it_op), lvb_len);
766
767                 OBD_ALLOC_LARGE(lmm, lvb_len);
768                 if (lmm == NULL) {
769                         LDLM_LOCK_PUT(lock);
770                         RETURN(-ENOMEM);
771                 }
772                 memcpy(lmm, lvb_data, lvb_len);
773
774                 /* install lvb_data */
775                 lock_res_and_lock(lock);
776                 if (lock->l_lvb_data == NULL) {
777                         lock->l_lvb_type = LVB_T_LAYOUT;
778                         lock->l_lvb_data = lmm;
779                         lock->l_lvb_len = lvb_len;
780                         lmm = NULL;
781                 }
782                 unlock_res_and_lock(lock);
783                 if (lmm != NULL)
784                         OBD_FREE_LARGE(lmm, lvb_len);
785         }
786         if (lock != NULL)
787                 LDLM_LOCK_PUT(lock);
788
789         RETURN(rc);
790 }
791
792 /* We always reserve enough space in the reply packet for a stripe MD, because
793  * we don't know in advance the file type. */
794 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
795                 struct lookup_intent *it, struct md_op_data *op_data,
796                 struct lustre_handle *lockh, void *lmm, int lmmsize,
797                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
798 {
799         struct obd_device     *obddev = class_exp2obd(exp);
800         struct ptlrpc_request *req = NULL;
801         __u64                  flags, saved_flags = extra_lock_flags;
802         int                    rc;
803         struct ldlm_res_id res_id;
804         static const ldlm_policy_data_t lookup_policy =
805                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
806         static const ldlm_policy_data_t update_policy =
807                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
808         static const ldlm_policy_data_t layout_policy =
809                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
810         static const ldlm_policy_data_t getxattr_policy = {
811                               .l_inodebits = { MDS_INODELOCK_XATTR } };
812         ldlm_policy_data_t const *policy = &lookup_policy;
813         int                    generation, resends = 0;
814         struct ldlm_reply     *lockrep;
815         enum lvb_type          lvb_type = 0;
816         ENTRY;
817
818         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
819                  einfo->ei_type);
820
821         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
822
823         if (it) {
824                 saved_flags |= LDLM_FL_HAS_INTENT;
825                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
826                         policy = &update_policy;
827                 else if (it->it_op & IT_LAYOUT)
828                         policy = &layout_policy;
829                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
830                         policy = &getxattr_policy;
831         }
832
833         LASSERT(reqp == NULL);
834
835         generation = obddev->u.cli.cl_import->imp_generation;
836 resend:
837         flags = saved_flags;
838         if (!it) {
839                 /* The only way right now is FLOCK, in this case we hide flock
840                    policy as lmm, but lmmsize is 0 */
841                 LASSERT(lmm && lmmsize == 0);
842                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
843                          einfo->ei_type);
844                 policy = (ldlm_policy_data_t *)lmm;
845                 res_id.name[3] = LDLM_FLOCK;
846         } else if (it->it_op & IT_OPEN) {
847                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
848                                            einfo->ei_cbdata);
849                 policy = &update_policy;
850                 einfo->ei_cbdata = NULL;
851                 lmm = NULL;
852         } else if (it->it_op & IT_UNLINK) {
853                 req = mdc_intent_unlink_pack(exp, it, op_data);
854         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
855                 req = mdc_intent_getattr_pack(exp, it, op_data);
856         } else if (it->it_op & IT_READDIR) {
857                 req = mdc_enqueue_pack(exp, 0);
858         } else if (it->it_op & IT_LAYOUT) {
859                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
860                         RETURN(-EOPNOTSUPP);
861                 req = mdc_intent_layout_pack(exp, it, op_data);
862                 lvb_type = LVB_T_LAYOUT;
863         } else if (it->it_op & IT_GETXATTR) {
864                 req = mdc_intent_getxattr_pack(exp, it, op_data);
865         } else {
866                 LBUG();
867                 RETURN(-EINVAL);
868         }
869
870         if (IS_ERR(req))
871                 RETURN(PTR_ERR(req));
872
873         if (req != NULL && it && it->it_op & IT_CREAT)
874                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
875                  * retry logic */
876                 req->rq_no_retry_einprogress = 1;
877
878         if (resends) {
879                 req->rq_generation_set = 1;
880                 req->rq_import_generation = generation;
881                 req->rq_sent = cfs_time_current_sec() + resends;
882         }
883
884         /* It is important to obtain rpc_lock first (if applicable), so that
885          * threads that are serialised with rpc_lock are not polluting our
886          * rpcs in flight counter. We do not do flock request limiting, though*/
887         if (it) {
888                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
889                 rc = mdc_enter_request(&obddev->u.cli);
890                 if (rc != 0) {
891                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
892                         mdc_clear_replay_flag(req, 0);
893                         ptlrpc_req_finished(req);
894                         RETURN(rc);
895                 }
896         }
897
898         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
899                               0, lvb_type, lockh, 0);
900         if (!it) {
901                 /* For flock requests we immediatelly return without further
902                    delay and let caller deal with the rest, since rest of
903                    this function metadata processing makes no sense for flock
904                    requests anyway. But in case of problem during comms with
905                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
906                    can not rely on caller and this mainly for F_UNLCKs
907                    (explicits or automatically generated by Kernel to clean
908                    current FLocks upon exit) that can't be trashed */
909                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
910                     (einfo->ei_type == LDLM_FLOCK) &&
911                     (einfo->ei_mode == LCK_NL))
912                         goto resend;
913                 RETURN(rc);
914         }
915
916         mdc_exit_request(&obddev->u.cli);
917         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
918
919         if (rc < 0) {
920                 CERROR("ldlm_cli_enqueue: %d\n", rc);
921                 mdc_clear_replay_flag(req, rc);
922                 ptlrpc_req_finished(req);
923                 RETURN(rc);
924         }
925
926         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
927         LASSERT(lockrep != NULL);
928
929         lockrep->lock_policy_res2 =
930                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
931
932         /* Retry the create infinitely when we get -EINPROGRESS from
933          * server. This is required by the new quota design. */
934         if (it && it->it_op & IT_CREAT &&
935             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
936                 mdc_clear_replay_flag(req, rc);
937                 ptlrpc_req_finished(req);
938                 resends++;
939
940                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
941                        obddev->obd_name, resends, it->it_op,
942                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
943
944                 if (generation == obddev->u.cli.cl_import->imp_generation) {
945                         goto resend;
946                 } else {
947                         CDEBUG(D_HA, "resend cross eviction\n");
948                         RETURN(-EIO);
949                 }
950         }
951
952         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
953         if (rc < 0) {
954                 if (lustre_handle_is_used(lockh)) {
955                         ldlm_lock_decref(lockh, einfo->ei_mode);
956                         memset(lockh, 0, sizeof(*lockh));
957                 }
958                 ptlrpc_req_finished(req);
959         }
960         RETURN(rc);
961 }
962
963 static int mdc_finish_intent_lock(struct obd_export *exp,
964                                   struct ptlrpc_request *request,
965                                   struct md_op_data *op_data,
966                                   struct lookup_intent *it,
967                                   struct lustre_handle *lockh)
968 {
969         struct lustre_handle old_lock;
970         struct mdt_body *mdt_body;
971         struct ldlm_lock *lock;
972         int rc;
973         ENTRY;
974
975         LASSERT(request != NULL);
976         LASSERT(request != LP_POISON);
977         LASSERT(request->rq_repmsg != LP_POISON);
978
979         if (it->it_op & IT_READDIR)
980                 RETURN(0);
981
982         if (!it_disposition(it, DISP_IT_EXECD)) {
983                 /* The server failed before it even started executing the
984                  * intent, i.e. because it couldn't unpack the request. */
985                 LASSERT(it->d.lustre.it_status != 0);
986                 RETURN(it->d.lustre.it_status);
987         }
988         rc = it_open_error(DISP_IT_EXECD, it);
989         if (rc)
990                 RETURN(rc);
991
992         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
993         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
994
995         /* If we were revalidating a fid/name pair, mark the intent in
996          * case we fail and get called again from lookup */
997         if (fid_is_sane(&op_data->op_fid2) &&
998             it->it_create_mode & M_CHECK_STALE &&
999             it->it_op != IT_GETATTR) {
1000                 /* Also: did we find the same inode? */
1001                 /* sever can return one of two fids:
1002                  * op_fid2 - new allocated fid - if file is created.
1003                  * op_fid3 - existent fid - if file only open.
1004                  * op_fid3 is saved in lmv_intent_open */
1005                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1006                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1007                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1008                                "\n", PFID(&op_data->op_fid2),
1009                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1010                         RETURN(-ESTALE);
1011                 }
1012         }
1013
1014         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1015         if (rc)
1016                 RETURN(rc);
1017
1018         /* keep requests around for the multiple phases of the call
1019          * this shows the DISP_XX must guarantee we make it into the call
1020          */
1021         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1022             it_disposition(it, DISP_OPEN_CREATE) &&
1023             !it_open_error(DISP_OPEN_CREATE, it)) {
1024                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1025                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1026         }
1027         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1028             it_disposition(it, DISP_OPEN_OPEN) &&
1029             !it_open_error(DISP_OPEN_OPEN, it)) {
1030                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1031                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1032                 /* BUG 11546 - eviction in the middle of open rpc processing */
1033                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1034         }
1035
1036         if (it->it_op & IT_CREAT) {
1037                 /* XXX this belongs in ll_create_it */
1038         } else if (it->it_op == IT_OPEN) {
1039                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1040         } else {
1041                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1042         }
1043
1044         /* If we already have a matching lock, then cancel the new
1045          * one.  We have to set the data here instead of in
1046          * mdc_enqueue, because we need to use the child's inode as
1047          * the l_ast_data to match, and that's not available until
1048          * intent_finish has performed the iget().) */
1049         lock = ldlm_handle2lock(lockh);
1050         if (lock) {
1051                 ldlm_policy_data_t policy = lock->l_policy_data;
1052                 LDLM_DEBUG(lock, "matching against this");
1053
1054                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1055                                          &lock->l_resource->lr_name),
1056                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1057                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1058                 LDLM_LOCK_PUT(lock);
1059
1060                 memcpy(&old_lock, lockh, sizeof(*lockh));
1061                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1062                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1063                         ldlm_lock_decref_and_cancel(lockh,
1064                                                     it->d.lustre.it_lock_mode);
1065                         memcpy(lockh, &old_lock, sizeof(old_lock));
1066                         it->d.lustre.it_lock_handle = lockh->cookie;
1067                 }
1068         }
1069         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1070                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1071                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1072         RETURN(rc);
1073 }
1074
1075 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1076                         struct lu_fid *fid, __u64 *bits)
1077 {
1078         /* We could just return 1 immediately, but since we should only
1079          * be called in revalidate_it if we already have a lock, let's
1080          * verify that. */
1081         struct ldlm_res_id res_id;
1082         struct lustre_handle lockh;
1083         ldlm_policy_data_t policy;
1084         ldlm_mode_t mode;
1085         ENTRY;
1086
1087         if (it->d.lustre.it_lock_handle) {
1088                 lockh.cookie = it->d.lustre.it_lock_handle;
1089                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1090         } else {
1091                 fid_build_reg_res_name(fid, &res_id);
1092                 switch (it->it_op) {
1093                 case IT_GETATTR:
1094                         /* File attributes are held under multiple bits:
1095                          * nlink is under lookup lock, size and times are
1096                          * under UPDATE lock and recently we've also got
1097                          * a separate permissions lock for owner/group/acl that
1098                          * were protected by lookup lock before.
1099                          * Getattr must provide all of that information,
1100                          * so we need to ensure we have all of those locks.
1101                          * Unfortunately, if the bits are split across multiple
1102                          * locks, there's no easy way to match all of them here,
1103                          * so an extra RPC would be performed to fetch all
1104                          * of those bits at once for now. */
1105                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1106                          * but for old MDTs (< 2.4), permission is covered
1107                          * by LOOKUP lock, so it needs to match all bits here.*/
1108                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1109                                                   MDS_INODELOCK_LOOKUP |
1110                                                   MDS_INODELOCK_PERM;
1111                         break;
1112                 case IT_READDIR:
1113                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1114                         break;
1115                 case IT_LAYOUT:
1116                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1117                         break;
1118                 default:
1119                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1120                         break;
1121                 }
1122
1123                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1124                                       LDLM_IBITS, &policy,
1125                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1126                                       &lockh);
1127         }
1128
1129         if (mode) {
1130                 it->d.lustre.it_lock_handle = lockh.cookie;
1131                 it->d.lustre.it_lock_mode = mode;
1132         } else {
1133                 it->d.lustre.it_lock_handle = 0;
1134                 it->d.lustre.it_lock_mode = 0;
1135         }
1136
1137         RETURN(!!mode);
1138 }
1139
1140 /*
1141  * This long block is all about fixing up the lock and request state
1142  * so that it is correct as of the moment _before_ the operation was
1143  * applied; that way, the VFS will think that everything is normal and
1144  * call Lustre's regular VFS methods.
1145  *
1146  * If we're performing a creation, that means that unless the creation
1147  * failed with EEXIST, we should fake up a negative dentry.
1148  *
1149  * For everything else, we want to lookup to succeed.
1150  *
1151  * One additional note: if CREATE or OPEN succeeded, we add an extra
1152  * reference to the request because we need to keep it around until
1153  * ll_create/ll_open gets called.
1154  *
1155  * The server will return to us, in it_disposition, an indication of
1156  * exactly what d.lustre.it_status refers to.
1157  *
1158  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1159  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1160  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1161  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1162  * was successful.
1163  *
1164  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1165  * child lookup.
1166  */
1167 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1168                     void *lmm, int lmmsize, struct lookup_intent *it,
1169                     int lookup_flags, struct ptlrpc_request **reqp,
1170                     ldlm_blocking_callback cb_blocking,
1171                     __u64 extra_lock_flags)
1172 {
1173         struct ldlm_enqueue_info einfo = {
1174                 .ei_type        = LDLM_IBITS,
1175                 .ei_mode        = it_to_lock_mode(it),
1176                 .ei_cb_bl       = cb_blocking,
1177                 .ei_cb_cp       = ldlm_completion_ast,
1178         };
1179         struct lustre_handle lockh;
1180         int rc = 0;
1181         ENTRY;
1182         LASSERT(it);
1183
1184         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1185                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1186                 op_data->op_name, PFID(&op_data->op_fid2),
1187                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1188                 it->it_flags);
1189
1190         lockh.cookie = 0;
1191         if (fid_is_sane(&op_data->op_fid2) &&
1192             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1193                 /* We could just return 1 immediately, but since we should only
1194                  * be called in revalidate_it if we already have a lock, let's
1195                  * verify that. */
1196                 it->d.lustre.it_lock_handle = 0;
1197                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1198                 /* Only return failure if it was not GETATTR by cfid
1199                    (from inode_revalidate) */
1200                 if (rc || op_data->op_namelen != 0)
1201                         RETURN(rc);
1202         }
1203
1204         /* For case if upper layer did not alloc fid, do it now. */
1205         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1206                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1207                 if (rc < 0) {
1208                         CERROR("Can't alloc new fid, rc %d\n", rc);
1209                         RETURN(rc);
1210                 }
1211         }
1212         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1213                          extra_lock_flags);
1214         if (rc < 0)
1215                 RETURN(rc);
1216
1217         *reqp = it->d.lustre.it_data;
1218         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1219         RETURN(rc);
1220 }
1221
1222 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1223                                               struct ptlrpc_request *req,
1224                                               void *args, int rc)
1225 {
1226         struct mdc_getattr_args  *ga = args;
1227         struct obd_export        *exp = ga->ga_exp;
1228         struct md_enqueue_info   *minfo = ga->ga_minfo;
1229         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1230         struct lookup_intent     *it;
1231         struct lustre_handle     *lockh;
1232         struct obd_device        *obddev;
1233         struct ldlm_reply        *lockrep;
1234         __u64                     flags = LDLM_FL_HAS_INTENT;
1235         ENTRY;
1236
1237         it    = &minfo->mi_it;
1238         lockh = &minfo->mi_lockh;
1239
1240         obddev = class_exp2obd(exp);
1241
1242         mdc_exit_request(&obddev->u.cli);
1243         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1244                 rc = -ETIMEDOUT;
1245
1246         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1247                                    &flags, NULL, 0, lockh, rc);
1248         if (rc < 0) {
1249                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1250                 mdc_clear_replay_flag(req, rc);
1251                 GOTO(out, rc);
1252         }
1253
1254         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1255         LASSERT(lockrep != NULL);
1256
1257         lockrep->lock_policy_res2 =
1258                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1259
1260         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1261         if (rc)
1262                 GOTO(out, rc);
1263
1264         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1265         EXIT;
1266
1267 out:
1268         OBD_FREE_PTR(einfo);
1269         minfo->mi_cb(req, minfo, rc);
1270         return 0;
1271 }
1272
1273 int mdc_intent_getattr_async(struct obd_export *exp,
1274                              struct md_enqueue_info *minfo,
1275                              struct ldlm_enqueue_info *einfo)
1276 {
1277         struct md_op_data       *op_data = &minfo->mi_data;
1278         struct lookup_intent    *it = &minfo->mi_it;
1279         struct ptlrpc_request   *req;
1280         struct mdc_getattr_args *ga;
1281         struct obd_device       *obddev = class_exp2obd(exp);
1282         struct ldlm_res_id       res_id;
1283         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1284          *     for statahead currently. Consider CMD in future, such two bits
1285          *     maybe managed by different MDS, should be adjusted then. */
1286         ldlm_policy_data_t       policy = {
1287                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1288                                                          MDS_INODELOCK_UPDATE }
1289                                  };
1290         int                      rc = 0;
1291         __u64                    flags = LDLM_FL_HAS_INTENT;
1292         ENTRY;
1293
1294         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1295                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1296                 ldlm_it2str(it->it_op), it->it_flags);
1297
1298         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1299         req = mdc_intent_getattr_pack(exp, it, op_data);
1300         if (IS_ERR(req))
1301                 RETURN(PTR_ERR(req));
1302
1303         rc = mdc_enter_request(&obddev->u.cli);
1304         if (rc != 0) {
1305                 ptlrpc_req_finished(req);
1306                 RETURN(rc);
1307         }
1308
1309         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1310                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1311         if (rc < 0) {
1312                 mdc_exit_request(&obddev->u.cli);
1313                 ptlrpc_req_finished(req);
1314                 RETURN(rc);
1315         }
1316
1317         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1318         ga = ptlrpc_req_async_args(req);
1319         ga->ga_exp = exp;
1320         ga->ga_minfo = minfo;
1321         ga->ga_einfo = einfo;
1322
1323         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1324         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1325
1326         RETURN(0);
1327 }