Whamcloud - gitweb
LU-3531 llite: move dir cache to MDC layer
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_disposition(struct lookup_intent *it, int flag)
62 {
63         return it->d.lustre.it_disposition & flag;
64 }
65 EXPORT_SYMBOL(it_disposition);
66
67 void it_set_disposition(struct lookup_intent *it, int flag)
68 {
69         it->d.lustre.it_disposition |= flag;
70 }
71 EXPORT_SYMBOL(it_set_disposition);
72
73 void it_clear_disposition(struct lookup_intent *it, int flag)
74 {
75         it->d.lustre.it_disposition &= ~flag;
76 }
77 EXPORT_SYMBOL(it_clear_disposition);
78
79 int it_open_error(int phase, struct lookup_intent *it)
80 {
81         if (it_disposition(it, DISP_OPEN_LEASE)) {
82                 if (phase >= DISP_OPEN_LEASE)
83                         return it->d.lustre.it_status;
84                 else
85                         return 0;
86         }
87         if (it_disposition(it, DISP_OPEN_OPEN)) {
88                 if (phase >= DISP_OPEN_OPEN)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93
94         if (it_disposition(it, DISP_OPEN_CREATE)) {
95                 if (phase >= DISP_OPEN_CREATE)
96                         return it->d.lustre.it_status;
97                 else
98                         return 0;
99         }
100
101         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102                 if (phase >= DISP_LOOKUP_EXECD)
103                         return it->d.lustre.it_status;
104                 else
105                         return 0;
106         }
107
108         if (it_disposition(it, DISP_IT_EXECD)) {
109                 if (phase >= DISP_IT_EXECD)
110                         return it->d.lustre.it_status;
111                 else
112                         return 0;
113         }
114         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115                it->d.lustre.it_status);
116         LBUG();
117         return 0;
118 }
119 EXPORT_SYMBOL(it_open_error);
120
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123                       __u64 *bits)
124 {
125         struct ldlm_lock *lock;
126         struct inode *new_inode = data;
127         ENTRY;
128
129         if(bits)
130                 *bits = 0;
131
132         if (!*lockh)
133                 RETURN(0);
134
135         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
136
137         LASSERT(lock != NULL);
138         lock_res_and_lock(lock);
139 #ifdef __KERNEL__
140         if (lock->l_resource->lr_lvb_inode &&
141             lock->l_resource->lr_lvb_inode != data) {
142                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143                 LASSERTF(old_inode->i_state & I_FREEING,
144                          "Found existing inode %p/%lu/%u state %lu in lock: "
145                          "setting data to %p/%lu/%u\n", old_inode,
146                          old_inode->i_ino, old_inode->i_generation,
147                          old_inode->i_state,
148                          new_inode, new_inode->i_ino, new_inode->i_generation);
149         }
150 #endif
151         lock->l_resource->lr_lvb_inode = new_inode;
152         if (bits)
153                 *bits = lock->l_policy_data.l_inodebits.bits;
154
155         unlock_res_and_lock(lock);
156         LDLM_LOCK_PUT(lock);
157
158         RETURN(0);
159 }
160
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162                            const struct lu_fid *fid, ldlm_type_t type,
163                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
164                            struct lustre_handle *lockh)
165 {
166         struct ldlm_res_id res_id;
167         ldlm_mode_t rc;
168         ENTRY;
169
170         fid_build_reg_res_name(fid, &res_id);
171         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
172                              &res_id, type, policy, mode, lockh, 0);
173         RETURN(rc);
174 }
175
176 int mdc_cancel_unused(struct obd_export *exp,
177                       const struct lu_fid *fid,
178                       ldlm_policy_data_t *policy,
179                       ldlm_mode_t mode,
180                       ldlm_cancel_flags_t flags,
181                       void *opaque)
182 {
183         struct ldlm_res_id res_id;
184         struct obd_device *obd = class_exp2obd(exp);
185         int rc;
186
187         ENTRY;
188
189         fid_build_reg_res_name(fid, &res_id);
190         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
191                                              policy, mode, flags, opaque);
192         RETURN(rc);
193 }
194
195 int mdc_null_inode(struct obd_export *exp,
196                    const struct lu_fid *fid)
197 {
198         struct ldlm_res_id res_id;
199         struct ldlm_resource *res;
200         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
201         ENTRY;
202
203         LASSERTF(ns != NULL, "no namespace passed\n");
204
205         fid_build_reg_res_name(fid, &res_id);
206
207         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
208         if(res == NULL)
209                 RETURN(0);
210
211         lock_res(res);
212         res->lr_lvb_inode = NULL;
213         unlock_res(res);
214
215         ldlm_resource_putref(res);
216         RETURN(0);
217 }
218
219 /* find any ldlm lock of the inode in mdc
220  * return 0    not find
221  *        1    find one
222  *      < 0    error */
223 int mdc_find_cbdata(struct obd_export *exp,
224                     const struct lu_fid *fid,
225                     ldlm_iterator_t it, void *data)
226 {
227         struct ldlm_res_id res_id;
228         int rc = 0;
229         ENTRY;
230
231         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
232         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
233                                    it, data);
234         if (rc == LDLM_ITER_STOP)
235                 RETURN(1);
236         else if (rc == LDLM_ITER_CONTINUE)
237                 RETURN(0);
238         RETURN(rc);
239 }
240
241 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
242 {
243         /* Don't hold error requests for replay. */
244         if (req->rq_replay) {
245                 spin_lock(&req->rq_lock);
246                 req->rq_replay = 0;
247                 spin_unlock(&req->rq_lock);
248         }
249         if (rc && req->rq_transno != 0) {
250                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
251                 LBUG();
252         }
253 }
254
255 /* Save a large LOV EA into the request buffer so that it is available
256  * for replay.  We don't do this in the initial request because the
257  * original request doesn't need this buffer (at most it sends just the
258  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
259  * buffer and may also be difficult to allocate and save a very large
260  * request buffer for each open. (bug 5707)
261  *
262  * OOM here may cause recovery failure if lmm is needed (only for the
263  * original open if the MDS crashed just when this client also OOM'd)
264  * but this is incredibly unlikely, and questionable whether the client
265  * could do MDS recovery under OOM anyways... */
266 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
267                                 struct mdt_body *body)
268 {
269         int     rc;
270
271         /* FIXME: remove this explicit offset. */
272         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
273                                         body->eadatasize);
274         if (rc) {
275                 CERROR("Can't enlarge segment %d size to %d\n",
276                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
277                 body->valid &= ~OBD_MD_FLEASIZE;
278                 body->eadatasize = 0;
279         }
280 }
281
282 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
283                                                    struct lookup_intent *it,
284                                                    struct md_op_data *op_data,
285                                                    void *lmm, int lmmsize,
286                                                    void *cb_data)
287 {
288         struct ptlrpc_request *req;
289         struct obd_device     *obddev = class_exp2obd(exp);
290         struct ldlm_intent    *lit;
291         CFS_LIST_HEAD(cancels);
292         int                    count = 0;
293         int                    mode;
294         int                    rc;
295         ENTRY;
296
297         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
298
299         /* XXX: openlock is not cancelled for cross-refs. */
300         /* If inode is known, cancel conflicting OPEN locks. */
301         if (fid_is_sane(&op_data->op_fid2)) {
302                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
303                         if (it->it_flags & FMODE_WRITE)
304                                 mode = LCK_EX;
305                         else
306                                 mode = LCK_PR;
307                 } else {
308                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
309                                 mode = LCK_CW;
310 #ifdef FMODE_EXEC
311                         else if (it->it_flags & FMODE_EXEC)
312                                 mode = LCK_PR;
313 #endif
314                         else
315                                 mode = LCK_CR;
316                 }
317                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
318                                                 &cancels, mode,
319                                                 MDS_INODELOCK_OPEN);
320         }
321
322         /* If CREATE, cancel parent's UPDATE lock. */
323         if (it->it_op & IT_CREAT)
324                 mode = LCK_EX;
325         else
326                 mode = LCK_CR;
327         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
328                                          &cancels, mode,
329                                          MDS_INODELOCK_UPDATE);
330
331         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
332                                    &RQF_LDLM_INTENT_OPEN);
333         if (req == NULL) {
334                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
335                 RETURN(ERR_PTR(-ENOMEM));
336         }
337
338         /* parent capability */
339         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
340         /* child capability, reserve the size according to parent capa, it will
341          * be filled after we get the reply */
342         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
343
344         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
345                              op_data->op_namelen + 1);
346         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
347                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
348
349         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
350         if (rc < 0) {
351                 ptlrpc_request_free(req);
352                 RETURN(ERR_PTR(rc));
353         }
354
355         spin_lock(&req->rq_lock);
356         req->rq_replay = req->rq_import->imp_replayable;
357         spin_unlock(&req->rq_lock);
358
359         /* pack the intent */
360         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
361         lit->opc = (__u64)it->it_op;
362
363         /* pack the intended request */
364         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
365                       lmmsize);
366
367         /* for remote client, fetch remote perm for current user */
368         if (client_is_remote(exp))
369                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
370                                      sizeof(struct mdt_remote_perm));
371         ptlrpc_request_set_replen(req);
372         return req;
373 }
374
375 static struct ptlrpc_request *
376 mdc_intent_getxattr_pack(struct obd_export *exp,
377                          struct lookup_intent *it,
378                          struct md_op_data *op_data)
379 {
380         struct ptlrpc_request   *req;
381         struct ldlm_intent      *lit;
382         int                     rc, count = 0, maxdata;
383         CFS_LIST_HEAD(cancels);
384
385         ENTRY;
386
387         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388                                         &RQF_LDLM_INTENT_GETXATTR);
389         if (req == NULL)
390                 RETURN(ERR_PTR(-ENOMEM));
391
392         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393
394         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
395         if (rc) {
396                 ptlrpc_request_free(req);
397                 RETURN(ERR_PTR(rc));
398         }
399
400         /* pack the intent */
401         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
402         lit->opc = IT_GETXATTR;
403
404         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
405
406         /* pack the intended request */
407         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
408                         op_data->op_valid, maxdata, -1, 0);
409
410         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
411                                 RCL_SERVER, maxdata);
412
413         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
414                                 RCL_SERVER, maxdata);
415
416         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
417                                 RCL_SERVER, maxdata);
418
419         ptlrpc_request_set_replen(req);
420
421         RETURN(req);
422 }
423
424 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
425                                                      struct lookup_intent *it,
426                                                      struct md_op_data *op_data)
427 {
428         struct ptlrpc_request *req;
429         struct obd_device     *obddev = class_exp2obd(exp);
430         struct ldlm_intent    *lit;
431         int                    rc;
432         ENTRY;
433
434         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
435                                    &RQF_LDLM_INTENT_UNLINK);
436         if (req == NULL)
437                 RETURN(ERR_PTR(-ENOMEM));
438
439         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
440         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
441                              op_data->op_namelen + 1);
442
443         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
444         if (rc) {
445                 ptlrpc_request_free(req);
446                 RETURN(ERR_PTR(rc));
447         }
448
449         /* pack the intent */
450         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
451         lit->opc = (__u64)it->it_op;
452
453         /* pack the intended request */
454         mdc_unlink_pack(req, op_data);
455
456         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
457                              obddev->u.cli.cl_max_mds_easize);
458         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
459                              obddev->u.cli.cl_max_mds_cookiesize);
460         ptlrpc_request_set_replen(req);
461         RETURN(req);
462 }
463
464 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
465                                                       struct lookup_intent *it,
466                                                       struct md_op_data *op_data)
467 {
468         struct ptlrpc_request *req;
469         struct obd_device     *obddev = class_exp2obd(exp);
470         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
471                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
472                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
473                                        (client_is_remote(exp) ?
474                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
475         struct ldlm_intent    *lit;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
480                                    &RQF_LDLM_INTENT_GETATTR);
481         if (req == NULL)
482                 RETURN(ERR_PTR(-ENOMEM));
483
484         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
485         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
486                              op_data->op_namelen + 1);
487
488         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
489         if (rc) {
490                 ptlrpc_request_free(req);
491                 RETURN(ERR_PTR(rc));
492         }
493
494         /* pack the intent */
495         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
496         lit->opc = (__u64)it->it_op;
497
498         /* pack the intended request */
499         mdc_getattr_pack(req, valid, it->it_flags, op_data,
500                          obddev->u.cli.cl_max_mds_easize);
501
502         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
503                              obddev->u.cli.cl_max_mds_easize);
504         if (client_is_remote(exp))
505                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
506                                      sizeof(struct mdt_remote_perm));
507         ptlrpc_request_set_replen(req);
508         RETURN(req);
509 }
510
511 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
512                                                      struct lookup_intent *it,
513                                                      struct md_op_data *unused)
514 {
515         struct obd_device     *obd = class_exp2obd(exp);
516         struct ptlrpc_request *req;
517         struct ldlm_intent    *lit;
518         struct layout_intent  *layout;
519         int rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
523                                 &RQF_LDLM_INTENT_LAYOUT);
524         if (req == NULL)
525                 RETURN(ERR_PTR(-ENOMEM));
526
527         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
528         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
529         if (rc) {
530                 ptlrpc_request_free(req);
531                 RETURN(ERR_PTR(rc));
532         }
533
534         /* pack the intent */
535         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
536         lit->opc = (__u64)it->it_op;
537
538         /* pack the layout intent request */
539         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
540         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
541          * set for replication */
542         layout->li_opc = LAYOUT_INTENT_ACCESS;
543
544         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
545                         obd->u.cli.cl_max_mds_easize);
546         ptlrpc_request_set_replen(req);
547         RETURN(req);
548 }
549
550 static struct ptlrpc_request *
551 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
552 {
553         struct ptlrpc_request *req;
554         int rc;
555         ENTRY;
556
557         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
558         if (req == NULL)
559                 RETURN(ERR_PTR(-ENOMEM));
560
561         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
562         if (rc) {
563                 ptlrpc_request_free(req);
564                 RETURN(ERR_PTR(rc));
565         }
566
567         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
568         ptlrpc_request_set_replen(req);
569         RETURN(req);
570 }
571
572 static int mdc_finish_enqueue(struct obd_export *exp,
573                               struct ptlrpc_request *req,
574                               struct ldlm_enqueue_info *einfo,
575                               struct lookup_intent *it,
576                               struct lustre_handle *lockh,
577                               int rc)
578 {
579         struct req_capsule  *pill = &req->rq_pill;
580         struct ldlm_request *lockreq;
581         struct ldlm_reply   *lockrep;
582         struct lustre_intent_data *intent = &it->d.lustre;
583         struct ldlm_lock    *lock;
584         void                *lvb_data = NULL;
585         int                  lvb_len = 0;
586         ENTRY;
587
588         LASSERT(rc >= 0);
589         /* Similarly, if we're going to replay this request, we don't want to
590          * actually get a lock, just perform the intent. */
591         if (req->rq_transno || req->rq_replay) {
592                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
593                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
594         }
595
596         if (rc == ELDLM_LOCK_ABORTED) {
597                 einfo->ei_mode = 0;
598                 memset(lockh, 0, sizeof(*lockh));
599                 rc = 0;
600         } else { /* rc = 0 */
601                 lock = ldlm_handle2lock(lockh);
602                 LASSERT(lock != NULL);
603
604                 /* If the server gave us back a different lock mode, we should
605                  * fix up our variables. */
606                 if (lock->l_req_mode != einfo->ei_mode) {
607                         ldlm_lock_addref(lockh, lock->l_req_mode);
608                         ldlm_lock_decref(lockh, einfo->ei_mode);
609                         einfo->ei_mode = lock->l_req_mode;
610                 }
611                 LDLM_LOCK_PUT(lock);
612         }
613
614         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
615         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
616
617         intent->it_disposition = (int)lockrep->lock_policy_res1;
618         intent->it_status = (int)lockrep->lock_policy_res2;
619         intent->it_lock_mode = einfo->ei_mode;
620         intent->it_lock_handle = lockh->cookie;
621         intent->it_data = req;
622
623         /* Technically speaking rq_transno must already be zero if
624          * it_status is in error, so the check is a bit redundant */
625         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
626                 mdc_clear_replay_flag(req, intent->it_status);
627
628         /* If we're doing an IT_OPEN which did not result in an actual
629          * successful open, then we need to remove the bit which saves
630          * this request for unconditional replay.
631          *
632          * It's important that we do this first!  Otherwise we might exit the
633          * function without doing so, and try to replay a failed create
634          * (bug 3440) */
635         if (it->it_op & IT_OPEN && req->rq_replay &&
636             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
637                 mdc_clear_replay_flag(req, intent->it_status);
638
639         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
640                   it->it_op, intent->it_disposition, intent->it_status);
641
642         /* We know what to expect, so we do any byte flipping required here */
643         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
644                 struct mdt_body *body;
645
646                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
647                 if (body == NULL) {
648                         CERROR ("Can't swab mdt_body\n");
649                         RETURN (-EPROTO);
650                 }
651
652                 if (it_disposition(it, DISP_OPEN_OPEN) &&
653                     !it_open_error(DISP_OPEN_OPEN, it)) {
654                         /*
655                          * If this is a successful OPEN request, we need to set
656                          * replay handler and data early, so that if replay
657                          * happens immediately after swabbing below, new reply
658                          * is swabbed by that handler correctly.
659                          */
660                         mdc_set_open_replay_data(NULL, NULL, it);
661                 }
662
663                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
664                         void *eadata;
665
666                         mdc_update_max_ea_from_body(exp, body);
667
668                         /*
669                          * The eadata is opaque; just check that it is there.
670                          * Eventually, obd_unpackmd() will check the contents.
671                          */
672                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
673                                                               body->eadatasize);
674                         if (eadata == NULL)
675                                 RETURN(-EPROTO);
676
677                         /* save lvb data and length in case this is for layout
678                          * lock */
679                         lvb_data = eadata;
680                         lvb_len = body->eadatasize;
681
682                         /*
683                          * We save the reply LOV EA in case we have to replay a
684                          * create for recovery.  If we didn't allocate a large
685                          * enough request buffer above we need to reallocate it
686                          * here to hold the actual LOV EA.
687                          *
688                          * To not save LOV EA if request is not going to replay
689                          * (for example error one).
690                          */
691                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
692                                 void *lmm;
693                                 if (req_capsule_get_size(pill, &RMF_EADATA,
694                                                          RCL_CLIENT) <
695                                     body->eadatasize)
696                                         mdc_realloc_openmsg(req, body);
697                                 else
698                                         req_capsule_shrink(pill, &RMF_EADATA,
699                                                            body->eadatasize,
700                                                            RCL_CLIENT);
701
702                                 req_capsule_set_size(pill, &RMF_EADATA,
703                                                      RCL_CLIENT,
704                                                      body->eadatasize);
705
706                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
707                                 if (lmm)
708                                         memcpy(lmm, eadata, body->eadatasize);
709                         }
710                 }
711
712                 if (body->valid & OBD_MD_FLRMTPERM) {
713                         struct mdt_remote_perm *perm;
714
715                         LASSERT(client_is_remote(exp));
716                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
717                                                 lustre_swab_mdt_remote_perm);
718                         if (perm == NULL)
719                                 RETURN(-EPROTO);
720                 }
721                 if (body->valid & OBD_MD_FLMDSCAPA) {
722                         struct lustre_capa *capa, *p;
723
724                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
725                         if (capa == NULL)
726                                 RETURN(-EPROTO);
727
728                         if (it->it_op & IT_OPEN) {
729                                 /* client fid capa will be checked in replay */
730                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
731                                 LASSERT(p);
732                                 *p = *capa;
733                         }
734                 }
735                 if (body->valid & OBD_MD_FLOSSCAPA) {
736                         struct lustre_capa *capa;
737
738                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
739                         if (capa == NULL)
740                                 RETURN(-EPROTO);
741                 }
742         } else if (it->it_op & IT_LAYOUT) {
743                 /* maybe the lock was granted right away and layout
744                  * is packed into RMF_DLM_LVB of req */
745                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
746                 if (lvb_len > 0) {
747                         lvb_data = req_capsule_server_sized_get(pill,
748                                                         &RMF_DLM_LVB, lvb_len);
749                         if (lvb_data == NULL)
750                                 RETURN(-EPROTO);
751                 }
752         }
753
754         /* fill in stripe data for layout lock */
755         lock = ldlm_handle2lock(lockh);
756         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
757                 void *lmm;
758
759                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
760                         ldlm_it2str(it->it_op), lvb_len);
761
762                 OBD_ALLOC_LARGE(lmm, lvb_len);
763                 if (lmm == NULL) {
764                         LDLM_LOCK_PUT(lock);
765                         RETURN(-ENOMEM);
766                 }
767                 memcpy(lmm, lvb_data, lvb_len);
768
769                 /* install lvb_data */
770                 lock_res_and_lock(lock);
771                 if (lock->l_lvb_data == NULL) {
772                         lock->l_lvb_type = LVB_T_LAYOUT;
773                         lock->l_lvb_data = lmm;
774                         lock->l_lvb_len = lvb_len;
775                         lmm = NULL;
776                 }
777                 unlock_res_and_lock(lock);
778                 if (lmm != NULL)
779                         OBD_FREE_LARGE(lmm, lvb_len);
780         }
781         if (lock != NULL)
782                 LDLM_LOCK_PUT(lock);
783
784         RETURN(rc);
785 }
786
787 /* We always reserve enough space in the reply packet for a stripe MD, because
788  * we don't know in advance the file type. */
789 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
790                 struct lookup_intent *it, struct md_op_data *op_data,
791                 struct lustre_handle *lockh, void *lmm, int lmmsize,
792                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
793 {
794         struct obd_device     *obddev = class_exp2obd(exp);
795         struct ptlrpc_request *req = NULL;
796         __u64                  flags, saved_flags = extra_lock_flags;
797         int                    rc;
798         struct ldlm_res_id res_id;
799         static const ldlm_policy_data_t lookup_policy =
800                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
801         static const ldlm_policy_data_t update_policy =
802                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
803         static const ldlm_policy_data_t layout_policy =
804                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
805         static const ldlm_policy_data_t getxattr_policy = {
806                               .l_inodebits = { MDS_INODELOCK_XATTR } };
807         ldlm_policy_data_t const *policy = &lookup_policy;
808         int                    generation, resends = 0;
809         struct ldlm_reply     *lockrep;
810         enum lvb_type          lvb_type = 0;
811         ENTRY;
812
813         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
814                  einfo->ei_type);
815
816         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
817
818         if (it) {
819                 saved_flags |= LDLM_FL_HAS_INTENT;
820                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
821                         policy = &update_policy;
822                 else if (it->it_op & IT_LAYOUT)
823                         policy = &layout_policy;
824                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
825                         policy = &getxattr_policy;
826         }
827
828         LASSERT(reqp == NULL);
829
830         generation = obddev->u.cli.cl_import->imp_generation;
831 resend:
832         flags = saved_flags;
833         if (!it) {
834                 /* The only way right now is FLOCK, in this case we hide flock
835                    policy as lmm, but lmmsize is 0 */
836                 LASSERT(lmm && lmmsize == 0);
837                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
838                          einfo->ei_type);
839                 policy = (ldlm_policy_data_t *)lmm;
840                 res_id.name[3] = LDLM_FLOCK;
841         } else if (it->it_op & IT_OPEN) {
842                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
843                                            einfo->ei_cbdata);
844                 policy = &update_policy;
845                 einfo->ei_cbdata = NULL;
846                 lmm = NULL;
847         } else if (it->it_op & IT_UNLINK) {
848                 req = mdc_intent_unlink_pack(exp, it, op_data);
849         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
850                 req = mdc_intent_getattr_pack(exp, it, op_data);
851         } else if (it->it_op & IT_READDIR) {
852                 req = mdc_enqueue_pack(exp, 0);
853         } else if (it->it_op & IT_LAYOUT) {
854                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
855                         RETURN(-EOPNOTSUPP);
856                 req = mdc_intent_layout_pack(exp, it, op_data);
857                 lvb_type = LVB_T_LAYOUT;
858         } else if (it->it_op & IT_GETXATTR) {
859                 req = mdc_intent_getxattr_pack(exp, it, op_data);
860         } else {
861                 LBUG();
862                 RETURN(-EINVAL);
863         }
864
865         if (IS_ERR(req))
866                 RETURN(PTR_ERR(req));
867
868         if (req != NULL && it && it->it_op & IT_CREAT)
869                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
870                  * retry logic */
871                 req->rq_no_retry_einprogress = 1;
872
873         if (resends) {
874                 req->rq_generation_set = 1;
875                 req->rq_import_generation = generation;
876                 req->rq_sent = cfs_time_current_sec() + resends;
877         }
878
879         /* It is important to obtain rpc_lock first (if applicable), so that
880          * threads that are serialised with rpc_lock are not polluting our
881          * rpcs in flight counter. We do not do flock request limiting, though*/
882         if (it) {
883                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
884                 rc = mdc_enter_request(&obddev->u.cli);
885                 if (rc != 0) {
886                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
887                         mdc_clear_replay_flag(req, 0);
888                         ptlrpc_req_finished(req);
889                         RETURN(rc);
890                 }
891         }
892
893         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
894                               0, lvb_type, lockh, 0);
895         if (!it) {
896                 /* For flock requests we immediatelly return without further
897                    delay and let caller deal with the rest, since rest of
898                    this function metadata processing makes no sense for flock
899                    requests anyway. But in case of problem during comms with
900                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
901                    can not rely on caller and this mainly for F_UNLCKs
902                    (explicits or automatically generated by Kernel to clean
903                    current FLocks upon exit) that can't be trashed */
904                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
905                     (einfo->ei_type == LDLM_FLOCK) &&
906                     (einfo->ei_mode == LCK_NL))
907                         goto resend;
908                 RETURN(rc);
909         }
910
911         mdc_exit_request(&obddev->u.cli);
912         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
913
914         if (rc < 0) {
915                 CERROR("ldlm_cli_enqueue: %d\n", rc);
916                 mdc_clear_replay_flag(req, rc);
917                 ptlrpc_req_finished(req);
918                 RETURN(rc);
919         }
920
921         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
922         LASSERT(lockrep != NULL);
923
924         lockrep->lock_policy_res2 =
925                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
926
927         /* Retry the create infinitely when we get -EINPROGRESS from
928          * server. This is required by the new quota design. */
929         if (it && it->it_op & IT_CREAT &&
930             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
931                 mdc_clear_replay_flag(req, rc);
932                 ptlrpc_req_finished(req);
933                 resends++;
934
935                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
936                        obddev->obd_name, resends, it->it_op,
937                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
938
939                 if (generation == obddev->u.cli.cl_import->imp_generation) {
940                         goto resend;
941                 } else {
942                         CDEBUG(D_HA, "resend cross eviction\n");
943                         RETURN(-EIO);
944                 }
945         }
946
947         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
948         if (rc < 0) {
949                 if (lustre_handle_is_used(lockh)) {
950                         ldlm_lock_decref(lockh, einfo->ei_mode);
951                         memset(lockh, 0, sizeof(*lockh));
952                 }
953                 ptlrpc_req_finished(req);
954         }
955         RETURN(rc);
956 }
957
958 static int mdc_finish_intent_lock(struct obd_export *exp,
959                                   struct ptlrpc_request *request,
960                                   struct md_op_data *op_data,
961                                   struct lookup_intent *it,
962                                   struct lustre_handle *lockh)
963 {
964         struct lustre_handle old_lock;
965         struct mdt_body *mdt_body;
966         struct ldlm_lock *lock;
967         int rc;
968         ENTRY;
969
970         LASSERT(request != NULL);
971         LASSERT(request != LP_POISON);
972         LASSERT(request->rq_repmsg != LP_POISON);
973
974         if (it->it_op & IT_READDIR)
975                 RETURN(0);
976
977         if (!it_disposition(it, DISP_IT_EXECD)) {
978                 /* The server failed before it even started executing the
979                  * intent, i.e. because it couldn't unpack the request. */
980                 LASSERT(it->d.lustre.it_status != 0);
981                 RETURN(it->d.lustre.it_status);
982         }
983         rc = it_open_error(DISP_IT_EXECD, it);
984         if (rc)
985                 RETURN(rc);
986
987         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
988         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
989
990         /* If we were revalidating a fid/name pair, mark the intent in
991          * case we fail and get called again from lookup */
992         if (fid_is_sane(&op_data->op_fid2) &&
993             it->it_create_mode & M_CHECK_STALE &&
994             it->it_op != IT_GETATTR) {
995                 /* Also: did we find the same inode? */
996                 /* sever can return one of two fids:
997                  * op_fid2 - new allocated fid - if file is created.
998                  * op_fid3 - existent fid - if file only open.
999                  * op_fid3 is saved in lmv_intent_open */
1000                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1001                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1002                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1003                                "\n", PFID(&op_data->op_fid2),
1004                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1005                         RETURN(-ESTALE);
1006                 }
1007         }
1008
1009         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1010         if (rc)
1011                 RETURN(rc);
1012
1013         /* keep requests around for the multiple phases of the call
1014          * this shows the DISP_XX must guarantee we make it into the call
1015          */
1016         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1017             it_disposition(it, DISP_OPEN_CREATE) &&
1018             !it_open_error(DISP_OPEN_CREATE, it)) {
1019                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1020                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1021         }
1022         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1023             it_disposition(it, DISP_OPEN_OPEN) &&
1024             !it_open_error(DISP_OPEN_OPEN, it)) {
1025                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1026                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1027                 /* BUG 11546 - eviction in the middle of open rpc processing */
1028                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1029         }
1030
1031         if (it->it_op & IT_CREAT) {
1032                 /* XXX this belongs in ll_create_it */
1033         } else if (it->it_op == IT_OPEN) {
1034                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1035         } else {
1036                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1037         }
1038
1039         /* If we already have a matching lock, then cancel the new
1040          * one.  We have to set the data here instead of in
1041          * mdc_enqueue, because we need to use the child's inode as
1042          * the l_ast_data to match, and that's not available until
1043          * intent_finish has performed the iget().) */
1044         lock = ldlm_handle2lock(lockh);
1045         if (lock) {
1046                 ldlm_policy_data_t policy = lock->l_policy_data;
1047                 LDLM_DEBUG(lock, "matching against this");
1048
1049                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1050                                          &lock->l_resource->lr_name),
1051                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1052                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1053                 LDLM_LOCK_PUT(lock);
1054
1055                 memcpy(&old_lock, lockh, sizeof(*lockh));
1056                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1057                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1058                         ldlm_lock_decref_and_cancel(lockh,
1059                                                     it->d.lustre.it_lock_mode);
1060                         memcpy(lockh, &old_lock, sizeof(old_lock));
1061                         it->d.lustre.it_lock_handle = lockh->cookie;
1062                 }
1063         }
1064         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1065                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1066                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1067         RETURN(rc);
1068 }
1069
1070 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1071                         struct lu_fid *fid, __u64 *bits)
1072 {
1073         /* We could just return 1 immediately, but since we should only
1074          * be called in revalidate_it if we already have a lock, let's
1075          * verify that. */
1076         struct ldlm_res_id res_id;
1077         struct lustre_handle lockh;
1078         ldlm_policy_data_t policy;
1079         ldlm_mode_t mode;
1080         ENTRY;
1081
1082         if (it->d.lustre.it_lock_handle) {
1083                 lockh.cookie = it->d.lustre.it_lock_handle;
1084                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1085         } else {
1086                 fid_build_reg_res_name(fid, &res_id);
1087                 switch (it->it_op) {
1088                 case IT_GETATTR:
1089                         /* File attributes are held under multiple bits:
1090                          * nlink is under lookup lock, size and times are
1091                          * under UPDATE lock and recently we've also got
1092                          * a separate permissions lock for owner/group/acl that
1093                          * were protected by lookup lock before.
1094                          * Getattr must provide all of that information,
1095                          * so we need to ensure we have all of those locks.
1096                          * Unfortunately, if the bits are split across multiple
1097                          * locks, there's no easy way to match all of them here,
1098                          * so an extra RPC would be performed to fetch all
1099                          * of those bits at once for now. */
1100                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1101                          * but for old MDTs (< 2.4), permission is covered
1102                          * by LOOKUP lock, so it needs to match all bits here.*/
1103                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1104                                                   MDS_INODELOCK_LOOKUP |
1105                                                   MDS_INODELOCK_PERM;
1106                         break;
1107                 case IT_READDIR:
1108                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1109                         break;
1110                 case IT_LAYOUT:
1111                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1112                         break;
1113                 default:
1114                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1115                         break;
1116                 }
1117
1118                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1119                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1120                                        LDLM_IBITS, &policy,
1121                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1122         }
1123
1124         if (mode) {
1125                 it->d.lustre.it_lock_handle = lockh.cookie;
1126                 it->d.lustre.it_lock_mode = mode;
1127         } else {
1128                 it->d.lustre.it_lock_handle = 0;
1129                 it->d.lustre.it_lock_mode = 0;
1130         }
1131
1132         RETURN(!!mode);
1133 }
1134
1135 /*
1136  * This long block is all about fixing up the lock and request state
1137  * so that it is correct as of the moment _before_ the operation was
1138  * applied; that way, the VFS will think that everything is normal and
1139  * call Lustre's regular VFS methods.
1140  *
1141  * If we're performing a creation, that means that unless the creation
1142  * failed with EEXIST, we should fake up a negative dentry.
1143  *
1144  * For everything else, we want to lookup to succeed.
1145  *
1146  * One additional note: if CREATE or OPEN succeeded, we add an extra
1147  * reference to the request because we need to keep it around until
1148  * ll_create/ll_open gets called.
1149  *
1150  * The server will return to us, in it_disposition, an indication of
1151  * exactly what d.lustre.it_status refers to.
1152  *
1153  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1154  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1155  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1156  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1157  * was successful.
1158  *
1159  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1160  * child lookup.
1161  */
1162 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1163                     void *lmm, int lmmsize, struct lookup_intent *it,
1164                     int lookup_flags, struct ptlrpc_request **reqp,
1165                     ldlm_blocking_callback cb_blocking,
1166                     __u64 extra_lock_flags)
1167 {
1168         struct ldlm_enqueue_info einfo = {
1169                 .ei_type        = LDLM_IBITS,
1170                 .ei_mode        = it_to_lock_mode(it),
1171                 .ei_cb_bl       = cb_blocking,
1172                 .ei_cb_cp       = ldlm_completion_ast,
1173         };
1174         struct lustre_handle lockh;
1175         int rc = 0;
1176         ENTRY;
1177         LASSERT(it);
1178
1179         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1180                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1181                 op_data->op_name, PFID(&op_data->op_fid2),
1182                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1183                 it->it_flags);
1184
1185         lockh.cookie = 0;
1186         if (fid_is_sane(&op_data->op_fid2) &&
1187             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1188                 /* We could just return 1 immediately, but since we should only
1189                  * be called in revalidate_it if we already have a lock, let's
1190                  * verify that. */
1191                 it->d.lustre.it_lock_handle = 0;
1192                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1193                 /* Only return failure if it was not GETATTR by cfid
1194                    (from inode_revalidate) */
1195                 if (rc || op_data->op_namelen != 0)
1196                         RETURN(rc);
1197         }
1198
1199         /* For case if upper layer did not alloc fid, do it now. */
1200         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1201                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1202                 if (rc < 0) {
1203                         CERROR("Can't alloc new fid, rc %d\n", rc);
1204                         RETURN(rc);
1205                 }
1206         }
1207         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1208                          extra_lock_flags);
1209         if (rc < 0)
1210                 RETURN(rc);
1211
1212         *reqp = it->d.lustre.it_data;
1213         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1214         RETURN(rc);
1215 }
1216
1217 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1218                                               struct ptlrpc_request *req,
1219                                               void *args, int rc)
1220 {
1221         struct mdc_getattr_args  *ga = args;
1222         struct obd_export        *exp = ga->ga_exp;
1223         struct md_enqueue_info   *minfo = ga->ga_minfo;
1224         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1225         struct lookup_intent     *it;
1226         struct lustre_handle     *lockh;
1227         struct obd_device        *obddev;
1228         struct ldlm_reply        *lockrep;
1229         __u64                     flags = LDLM_FL_HAS_INTENT;
1230         ENTRY;
1231
1232         it    = &minfo->mi_it;
1233         lockh = &minfo->mi_lockh;
1234
1235         obddev = class_exp2obd(exp);
1236
1237         mdc_exit_request(&obddev->u.cli);
1238         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1239                 rc = -ETIMEDOUT;
1240
1241         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1242                                    &flags, NULL, 0, lockh, rc);
1243         if (rc < 0) {
1244                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1245                 mdc_clear_replay_flag(req, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1250         LASSERT(lockrep != NULL);
1251
1252         lockrep->lock_policy_res2 =
1253                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1254
1255         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1256         if (rc)
1257                 GOTO(out, rc);
1258
1259         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1260         EXIT;
1261
1262 out:
1263         OBD_FREE_PTR(einfo);
1264         minfo->mi_cb(req, minfo, rc);
1265         return 0;
1266 }
1267
1268 int mdc_intent_getattr_async(struct obd_export *exp,
1269                              struct md_enqueue_info *minfo,
1270                              struct ldlm_enqueue_info *einfo)
1271 {
1272         struct md_op_data       *op_data = &minfo->mi_data;
1273         struct lookup_intent    *it = &minfo->mi_it;
1274         struct ptlrpc_request   *req;
1275         struct mdc_getattr_args *ga;
1276         struct obd_device       *obddev = class_exp2obd(exp);
1277         struct ldlm_res_id       res_id;
1278         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1279          *     for statahead currently. Consider CMD in future, such two bits
1280          *     maybe managed by different MDS, should be adjusted then. */
1281         ldlm_policy_data_t       policy = {
1282                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1283                                                          MDS_INODELOCK_UPDATE }
1284                                  };
1285         int                      rc = 0;
1286         __u64                    flags = LDLM_FL_HAS_INTENT;
1287         ENTRY;
1288
1289         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1290                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1291                 ldlm_it2str(it->it_op), it->it_flags);
1292
1293         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1294         req = mdc_intent_getattr_pack(exp, it, op_data);
1295         if (IS_ERR(req))
1296                 RETURN(PTR_ERR(req));
1297
1298         rc = mdc_enter_request(&obddev->u.cli);
1299         if (rc != 0) {
1300                 ptlrpc_req_finished(req);
1301                 RETURN(rc);
1302         }
1303
1304         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1305                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1306         if (rc < 0) {
1307                 mdc_exit_request(&obddev->u.cli);
1308                 ptlrpc_req_finished(req);
1309                 RETURN(rc);
1310         }
1311
1312         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1313         ga = ptlrpc_req_async_args(req);
1314         ga->ga_exp = exp;
1315         ga->ga_minfo = minfo;
1316         ga->ga_einfo = einfo;
1317
1318         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1319         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1320
1321         RETURN(0);
1322 }