Whamcloud - gitweb
LU-3963 cleanup: C89 and build cleanups
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
54
55 struct mdc_getattr_args {
56         struct obd_export           *ga_exp;
57         struct md_enqueue_info      *ga_minfo;
58         struct ldlm_enqueue_info    *ga_einfo;
59 };
60
61 int it_open_error(int phase, struct lookup_intent *it)
62 {
63         if (it_disposition(it, DISP_OPEN_LEASE)) {
64                 if (phase >= DISP_OPEN_LEASE)
65                         return it->d.lustre.it_status;
66                 else
67                         return 0;
68         }
69         if (it_disposition(it, DISP_OPEN_OPEN)) {
70                 if (phase >= DISP_OPEN_OPEN)
71                         return it->d.lustre.it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_OPEN_CREATE)) {
77                 if (phase >= DISP_OPEN_CREATE)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84                 if (phase >= DISP_LOOKUP_EXECD)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_IT_EXECD)) {
91                 if (phase >= DISP_IT_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97                it->d.lustre.it_status);
98         LBUG();
99         return 0;
100 }
101 EXPORT_SYMBOL(it_open_error);
102
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
105                       __u64 *bits)
106 {
107         struct ldlm_lock *lock;
108         struct inode *new_inode = data;
109         ENTRY;
110
111         if(bits)
112                 *bits = 0;
113
114         if (!*lockh)
115                 RETURN(0);
116
117         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
118
119         LASSERT(lock != NULL);
120         lock_res_and_lock(lock);
121 #ifdef __KERNEL__
122         if (lock->l_resource->lr_lvb_inode &&
123             lock->l_resource->lr_lvb_inode != data) {
124                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125                 LASSERTF(old_inode->i_state & I_FREEING,
126                          "Found existing inode %p/%lu/%u state %lu in lock: "
127                          "setting data to %p/%lu/%u\n", old_inode,
128                          old_inode->i_ino, old_inode->i_generation,
129                          old_inode->i_state,
130                          new_inode, new_inode->i_ino, new_inode->i_generation);
131         }
132 #endif
133         lock->l_resource->lr_lvb_inode = new_inode;
134         if (bits)
135                 *bits = lock->l_policy_data.l_inodebits.bits;
136
137         unlock_res_and_lock(lock);
138         LDLM_LOCK_PUT(lock);
139
140         RETURN(0);
141 }
142
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144                            const struct lu_fid *fid, ldlm_type_t type,
145                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
146                            struct lustre_handle *lockh)
147 {
148         struct ldlm_res_id res_id;
149         ldlm_mode_t rc;
150         ENTRY;
151
152         fid_build_reg_res_name(fid, &res_id);
153         /* LU-4405: Clear bits not supported by server */
154         policy->l_inodebits.bits &= exp_connect_ibits(exp);
155         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156                              &res_id, type, policy, mode, lockh, 0);
157         RETURN(rc);
158 }
159
160 int mdc_cancel_unused(struct obd_export *exp,
161                       const struct lu_fid *fid,
162                       ldlm_policy_data_t *policy,
163                       ldlm_mode_t mode,
164                       ldlm_cancel_flags_t flags,
165                       void *opaque)
166 {
167         struct ldlm_res_id res_id;
168         struct obd_device *obd = class_exp2obd(exp);
169         int rc;
170
171         ENTRY;
172
173         fid_build_reg_res_name(fid, &res_id);
174         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175                                              policy, mode, flags, opaque);
176         RETURN(rc);
177 }
178
179 int mdc_null_inode(struct obd_export *exp,
180                    const struct lu_fid *fid)
181 {
182         struct ldlm_res_id res_id;
183         struct ldlm_resource *res;
184         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
185         ENTRY;
186
187         LASSERTF(ns != NULL, "no namespace passed\n");
188
189         fid_build_reg_res_name(fid, &res_id);
190
191         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
192         if(res == NULL)
193                 RETURN(0);
194
195         lock_res(res);
196         res->lr_lvb_inode = NULL;
197         unlock_res(res);
198
199         ldlm_resource_putref(res);
200         RETURN(0);
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267                                                    struct lookup_intent *it,
268                                                    struct md_op_data *op_data,
269                                                    void *lmm, int lmmsize,
270                                                    void *cb_data)
271 {
272         struct ptlrpc_request *req;
273         struct obd_device     *obddev = class_exp2obd(exp);
274         struct ldlm_intent    *lit;
275         CFS_LIST_HEAD(cancels);
276         int                    count = 0;
277         int                    mode;
278         int                    rc;
279         ENTRY;
280
281         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
282
283         /* XXX: openlock is not cancelled for cross-refs. */
284         /* If inode is known, cancel conflicting OPEN locks. */
285         if (fid_is_sane(&op_data->op_fid2)) {
286                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
287                         if (it->it_flags & FMODE_WRITE)
288                                 mode = LCK_EX;
289                         else
290                                 mode = LCK_PR;
291                 } else {
292                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
293                                 mode = LCK_CW;
294 #ifdef FMODE_EXEC
295                         else if (it->it_flags & FMODE_EXEC)
296                                 mode = LCK_PR;
297 #endif
298                         else
299                                 mode = LCK_CR;
300                 }
301                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
302                                                 &cancels, mode,
303                                                 MDS_INODELOCK_OPEN);
304         }
305
306         /* If CREATE, cancel parent's UPDATE lock. */
307         if (it->it_op & IT_CREAT)
308                 mode = LCK_EX;
309         else
310                 mode = LCK_CR;
311         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
312                                          &cancels, mode,
313                                          MDS_INODELOCK_UPDATE);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
316                                    &RQF_LDLM_INTENT_OPEN);
317         if (req == NULL) {
318                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
319                 RETURN(ERR_PTR(-ENOMEM));
320         }
321
322         /* parent capability */
323         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
324         /* child capability, reserve the size according to parent capa, it will
325          * be filled after we get the reply */
326         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
327
328         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
329                              op_data->op_namelen + 1);
330         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
331                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
332
333         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334         if (rc < 0) {
335                 ptlrpc_request_free(req);
336                 RETURN(ERR_PTR(rc));
337         }
338
339         spin_lock(&req->rq_lock);
340         req->rq_replay = req->rq_import->imp_replayable;
341         spin_unlock(&req->rq_lock);
342
343         /* pack the intent */
344         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
345         lit->opc = (__u64)it->it_op;
346
347         /* pack the intended request */
348         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
349                       lmmsize);
350
351         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
352                              obddev->u.cli.cl_max_mds_easize);
353
354         /* for remote client, fetch remote perm for current user */
355         if (client_is_remote(exp))
356                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357                                      sizeof(struct mdt_remote_perm));
358         ptlrpc_request_set_replen(req);
359         return req;
360 }
361
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364                          struct lookup_intent *it,
365                          struct md_op_data *op_data)
366 {
367         struct ptlrpc_request   *req;
368         struct ldlm_intent      *lit;
369         int                     rc, count = 0, maxdata;
370         CFS_LIST_HEAD(cancels);
371
372         ENTRY;
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375                                         &RQF_LDLM_INTENT_GETXATTR);
376         if (req == NULL)
377                 RETURN(ERR_PTR(-ENOMEM));
378
379         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380
381         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(ERR_PTR(rc));
385         }
386
387         /* pack the intent */
388         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389         lit->opc = IT_GETXATTR;
390
391         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
392
393         /* pack the intended request */
394         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395                         op_data->op_valid, maxdata, -1, 0);
396
397         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398                                 RCL_SERVER, maxdata);
399
400         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401                                 RCL_SERVER, maxdata);
402
403         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404                                 RCL_SERVER, maxdata);
405
406         ptlrpc_request_set_replen(req);
407
408         RETURN(req);
409 }
410
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412                                                      struct lookup_intent *it,
413                                                      struct md_op_data *op_data)
414 {
415         struct ptlrpc_request *req;
416         struct obd_device     *obddev = class_exp2obd(exp);
417         struct ldlm_intent    *lit;
418         int                    rc;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422                                    &RQF_LDLM_INTENT_UNLINK);
423         if (req == NULL)
424                 RETURN(ERR_PTR(-ENOMEM));
425
426         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
427         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
428                              op_data->op_namelen + 1);
429
430         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
431         if (rc) {
432                 ptlrpc_request_free(req);
433                 RETURN(ERR_PTR(rc));
434         }
435
436         /* pack the intent */
437         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
438         lit->opc = (__u64)it->it_op;
439
440         /* pack the intended request */
441         mdc_unlink_pack(req, op_data);
442
443         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444                              obddev->u.cli.cl_default_mds_easize);
445         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
446                              obddev->u.cli.cl_default_mds_cookiesize);
447         ptlrpc_request_set_replen(req);
448         RETURN(req);
449 }
450
451 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
452                                                       struct lookup_intent *it,
453                                                       struct md_op_data *op_data)
454 {
455         struct ptlrpc_request *req;
456         struct obd_device     *obddev = class_exp2obd(exp);
457         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
460                                        (client_is_remote(exp) ?
461                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
462         struct ldlm_intent    *lit;
463         int                    rc;
464         ENTRY;
465
466         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467                                    &RQF_LDLM_INTENT_GETATTR);
468         if (req == NULL)
469                 RETURN(ERR_PTR(-ENOMEM));
470
471         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473                              op_data->op_namelen + 1);
474
475         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
476         if (rc) {
477                 ptlrpc_request_free(req);
478                 RETURN(ERR_PTR(rc));
479         }
480
481         /* pack the intent */
482         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483         lit->opc = (__u64)it->it_op;
484
485         /* pack the intended request */
486         mdc_getattr_pack(req, valid, it->it_flags, op_data,
487                          obddev->u.cli.cl_default_mds_easize);
488
489         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490                              obddev->u.cli.cl_default_mds_easize);
491         if (client_is_remote(exp))
492                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493                                      sizeof(struct mdt_remote_perm));
494         ptlrpc_request_set_replen(req);
495         RETURN(req);
496 }
497
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499                                                      struct lookup_intent *it,
500                                                      struct md_op_data *unused)
501 {
502         struct obd_device     *obd = class_exp2obd(exp);
503         struct ptlrpc_request *req;
504         struct ldlm_intent    *lit;
505         struct layout_intent  *layout;
506         int rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
510                                 &RQF_LDLM_INTENT_LAYOUT);
511         if (req == NULL)
512                 RETURN(ERR_PTR(-ENOMEM));
513
514         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
515         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
516         if (rc) {
517                 ptlrpc_request_free(req);
518                 RETURN(ERR_PTR(rc));
519         }
520
521         /* pack the intent */
522         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
523         lit->opc = (__u64)it->it_op;
524
525         /* pack the layout intent request */
526         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
527         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
528          * set for replication */
529         layout->li_opc = LAYOUT_INTENT_ACCESS;
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
532                              obd->u.cli.cl_default_mds_easize);
533         ptlrpc_request_set_replen(req);
534         RETURN(req);
535 }
536
537 static struct ptlrpc_request *
538 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
539 {
540         struct ptlrpc_request *req;
541         int rc;
542         ENTRY;
543
544         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545         if (req == NULL)
546                 RETURN(ERR_PTR(-ENOMEM));
547
548         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549         if (rc) {
550                 ptlrpc_request_free(req);
551                 RETURN(ERR_PTR(rc));
552         }
553
554         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555         ptlrpc_request_set_replen(req);
556         RETURN(req);
557 }
558
559 static int mdc_finish_enqueue(struct obd_export *exp,
560                               struct ptlrpc_request *req,
561                               struct ldlm_enqueue_info *einfo,
562                               struct lookup_intent *it,
563                               struct lustre_handle *lockh,
564                               int rc)
565 {
566         struct req_capsule  *pill = &req->rq_pill;
567         struct ldlm_request *lockreq;
568         struct ldlm_reply   *lockrep;
569         struct lustre_intent_data *intent = &it->d.lustre;
570         struct ldlm_lock    *lock;
571         void                *lvb_data = NULL;
572         int                  lvb_len = 0;
573         ENTRY;
574
575         LASSERT(rc >= 0);
576         /* Similarly, if we're going to replay this request, we don't want to
577          * actually get a lock, just perform the intent. */
578         if (req->rq_transno || req->rq_replay) {
579                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
580                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
581         }
582
583         if (rc == ELDLM_LOCK_ABORTED) {
584                 einfo->ei_mode = 0;
585                 memset(lockh, 0, sizeof(*lockh));
586                 rc = 0;
587         } else { /* rc = 0 */
588                 lock = ldlm_handle2lock(lockh);
589                 LASSERT(lock != NULL);
590
591                 /* If the server gave us back a different lock mode, we should
592                  * fix up our variables. */
593                 if (lock->l_req_mode != einfo->ei_mode) {
594                         ldlm_lock_addref(lockh, lock->l_req_mode);
595                         ldlm_lock_decref(lockh, einfo->ei_mode);
596                         einfo->ei_mode = lock->l_req_mode;
597                 }
598                 LDLM_LOCK_PUT(lock);
599         }
600
601         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
602         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
603
604         intent->it_disposition = (int)lockrep->lock_policy_res1;
605         intent->it_status = (int)lockrep->lock_policy_res2;
606         intent->it_lock_mode = einfo->ei_mode;
607         intent->it_lock_handle = lockh->cookie;
608         intent->it_data = req;
609
610         /* Technically speaking rq_transno must already be zero if
611          * it_status is in error, so the check is a bit redundant */
612         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
613                 mdc_clear_replay_flag(req, intent->it_status);
614
615         /* If we're doing an IT_OPEN which did not result in an actual
616          * successful open, then we need to remove the bit which saves
617          * this request for unconditional replay.
618          *
619          * It's important that we do this first!  Otherwise we might exit the
620          * function without doing so, and try to replay a failed create
621          * (bug 3440) */
622         if (it->it_op & IT_OPEN && req->rq_replay &&
623             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
624                 mdc_clear_replay_flag(req, intent->it_status);
625
626         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
627                   it->it_op, intent->it_disposition, intent->it_status);
628
629         /* We know what to expect, so we do any byte flipping required here */
630         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
631                 struct mdt_body *body;
632
633                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
634                 if (body == NULL) {
635                         CERROR ("Can't swab mdt_body\n");
636                         RETURN (-EPROTO);
637                 }
638
639                 if (it_disposition(it, DISP_OPEN_OPEN) &&
640                     !it_open_error(DISP_OPEN_OPEN, it)) {
641                         /*
642                          * If this is a successful OPEN request, we need to set
643                          * replay handler and data early, so that if replay
644                          * happens immediately after swabbing below, new reply
645                          * is swabbed by that handler correctly.
646                          */
647                         mdc_set_open_replay_data(NULL, NULL, it);
648                 }
649
650                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
651                         void *eadata;
652
653                         mdc_update_max_ea_from_body(exp, body);
654
655                         /*
656                          * The eadata is opaque; just check that it is there.
657                          * Eventually, obd_unpackmd() will check the contents.
658                          */
659                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
660                                                               body->eadatasize);
661                         if (eadata == NULL)
662                                 RETURN(-EPROTO);
663
664                         /* save lvb data and length in case this is for layout
665                          * lock */
666                         lvb_data = eadata;
667                         lvb_len = body->eadatasize;
668
669                         /*
670                          * We save the reply LOV EA in case we have to replay a
671                          * create for recovery.  If we didn't allocate a large
672                          * enough request buffer above we need to reallocate it
673                          * here to hold the actual LOV EA.
674                          *
675                          * To not save LOV EA if request is not going to replay
676                          * (for example error one).
677                          */
678                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
679                                 void *lmm;
680                                 if (req_capsule_get_size(pill, &RMF_EADATA,
681                                                          RCL_CLIENT) <
682                                     body->eadatasize)
683                                         mdc_realloc_openmsg(req, body);
684                                 else
685                                         req_capsule_shrink(pill, &RMF_EADATA,
686                                                            body->eadatasize,
687                                                            RCL_CLIENT);
688
689                                 req_capsule_set_size(pill, &RMF_EADATA,
690                                                      RCL_CLIENT,
691                                                      body->eadatasize);
692
693                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
694                                 if (lmm)
695                                         memcpy(lmm, eadata, body->eadatasize);
696                         }
697                 }
698
699                 if (body->valid & OBD_MD_FLRMTPERM) {
700                         struct mdt_remote_perm *perm;
701
702                         LASSERT(client_is_remote(exp));
703                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704                                                 lustre_swab_mdt_remote_perm);
705                         if (perm == NULL)
706                                 RETURN(-EPROTO);
707                 }
708                 if (body->valid & OBD_MD_FLMDSCAPA) {
709                         struct lustre_capa *capa, *p;
710
711                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
712                         if (capa == NULL)
713                                 RETURN(-EPROTO);
714
715                         if (it->it_op & IT_OPEN) {
716                                 /* client fid capa will be checked in replay */
717                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
718                                 LASSERT(p);
719                                 *p = *capa;
720                         }
721                 }
722                 if (body->valid & OBD_MD_FLOSSCAPA) {
723                         struct lustre_capa *capa;
724
725                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
726                         if (capa == NULL)
727                                 RETURN(-EPROTO);
728                 }
729         } else if (it->it_op & IT_LAYOUT) {
730                 /* maybe the lock was granted right away and layout
731                  * is packed into RMF_DLM_LVB of req */
732                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
733                 if (lvb_len > 0) {
734                         lvb_data = req_capsule_server_sized_get(pill,
735                                                         &RMF_DLM_LVB, lvb_len);
736                         if (lvb_data == NULL)
737                                 RETURN(-EPROTO);
738                 }
739         }
740
741         /* fill in stripe data for layout lock */
742         lock = ldlm_handle2lock(lockh);
743         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
744                 void *lmm;
745
746                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747                         ldlm_it2str(it->it_op), lvb_len);
748
749                 OBD_ALLOC_LARGE(lmm, lvb_len);
750                 if (lmm == NULL) {
751                         LDLM_LOCK_PUT(lock);
752                         RETURN(-ENOMEM);
753                 }
754                 memcpy(lmm, lvb_data, lvb_len);
755
756                 /* install lvb_data */
757                 lock_res_and_lock(lock);
758                 if (lock->l_lvb_data == NULL) {
759                         lock->l_lvb_type = LVB_T_LAYOUT;
760                         lock->l_lvb_data = lmm;
761                         lock->l_lvb_len = lvb_len;
762                         lmm = NULL;
763                 }
764                 unlock_res_and_lock(lock);
765                 if (lmm != NULL)
766                         OBD_FREE_LARGE(lmm, lvb_len);
767         }
768         if (lock != NULL)
769                 LDLM_LOCK_PUT(lock);
770
771         RETURN(rc);
772 }
773
774 /* We always reserve enough space in the reply packet for a stripe MD, because
775  * we don't know in advance the file type. */
776 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777                 struct lookup_intent *it, struct md_op_data *op_data,
778                 struct lustre_handle *lockh, void *lmm, int lmmsize,
779                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
780 {
781         struct obd_device     *obddev = class_exp2obd(exp);
782         struct ptlrpc_request *req = NULL;
783         __u64                  flags, saved_flags = extra_lock_flags;
784         int                    rc;
785         struct ldlm_res_id res_id;
786         static const ldlm_policy_data_t lookup_policy =
787                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
788         static const ldlm_policy_data_t update_policy =
789                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
790         static const ldlm_policy_data_t layout_policy =
791                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
792         static const ldlm_policy_data_t getxattr_policy = {
793                               .l_inodebits = { MDS_INODELOCK_XATTR } };
794         ldlm_policy_data_t const *policy = &lookup_policy;
795         int                    generation, resends = 0;
796         struct ldlm_reply     *lockrep;
797         enum lvb_type          lvb_type = 0;
798         ENTRY;
799
800         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
801                  einfo->ei_type);
802
803         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
804
805         if (it) {
806                 saved_flags |= LDLM_FL_HAS_INTENT;
807                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
808                         policy = &update_policy;
809                 else if (it->it_op & IT_LAYOUT)
810                         policy = &layout_policy;
811                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812                         policy = &getxattr_policy;
813         }
814
815         LASSERT(reqp == NULL);
816
817         generation = obddev->u.cli.cl_import->imp_generation;
818 resend:
819         flags = saved_flags;
820         if (!it) {
821                 /* The only way right now is FLOCK, in this case we hide flock
822                    policy as lmm, but lmmsize is 0 */
823                 LASSERT(lmm && lmmsize == 0);
824                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
825                          einfo->ei_type);
826                 policy = (ldlm_policy_data_t *)lmm;
827                 res_id.name[3] = LDLM_FLOCK;
828         } else if (it->it_op & IT_OPEN) {
829                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
830                                            einfo->ei_cbdata);
831                 policy = &update_policy;
832                 einfo->ei_cbdata = NULL;
833                 lmm = NULL;
834         } else if (it->it_op & IT_UNLINK) {
835                 req = mdc_intent_unlink_pack(exp, it, op_data);
836         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
837                 req = mdc_intent_getattr_pack(exp, it, op_data);
838         } else if (it->it_op & IT_READDIR) {
839                 req = mdc_enqueue_pack(exp, 0);
840         } else if (it->it_op & IT_LAYOUT) {
841                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
842                         RETURN(-EOPNOTSUPP);
843                 req = mdc_intent_layout_pack(exp, it, op_data);
844                 lvb_type = LVB_T_LAYOUT;
845         } else if (it->it_op & IT_GETXATTR) {
846                 req = mdc_intent_getxattr_pack(exp, it, op_data);
847         } else {
848                 LBUG();
849                 RETURN(-EINVAL);
850         }
851
852         if (IS_ERR(req))
853                 RETURN(PTR_ERR(req));
854
855         if (req != NULL && it && it->it_op & IT_CREAT)
856                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
857                  * retry logic */
858                 req->rq_no_retry_einprogress = 1;
859
860         if (resends) {
861                 req->rq_generation_set = 1;
862                 req->rq_import_generation = generation;
863                 req->rq_sent = cfs_time_current_sec() + resends;
864         }
865
866         /* It is important to obtain rpc_lock first (if applicable), so that
867          * threads that are serialised with rpc_lock are not polluting our
868          * rpcs in flight counter. We do not do flock request limiting, though*/
869         if (it) {
870                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
871                 rc = mdc_enter_request(&obddev->u.cli);
872                 if (rc != 0) {
873                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
874                         mdc_clear_replay_flag(req, 0);
875                         ptlrpc_req_finished(req);
876                         RETURN(rc);
877                 }
878         }
879
880         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
881                               0, lvb_type, lockh, 0);
882         if (!it) {
883                 /* For flock requests we immediatelly return without further
884                    delay and let caller deal with the rest, since rest of
885                    this function metadata processing makes no sense for flock
886                    requests anyway. But in case of problem during comms with
887                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
888                    can not rely on caller and this mainly for F_UNLCKs
889                    (explicits or automatically generated by Kernel to clean
890                    current FLocks upon exit) that can't be trashed */
891                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
892                     (einfo->ei_type == LDLM_FLOCK) &&
893                     (einfo->ei_mode == LCK_NL))
894                         goto resend;
895                 RETURN(rc);
896         }
897
898         mdc_exit_request(&obddev->u.cli);
899         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
900
901         if (rc < 0) {
902                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
903                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
904                              obddev->obd_name, rc);
905
906                 mdc_clear_replay_flag(req, rc);
907                 ptlrpc_req_finished(req);
908                 RETURN(rc);
909         }
910
911         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
912         LASSERT(lockrep != NULL);
913
914         lockrep->lock_policy_res2 =
915                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
916
917         /* Retry the create infinitely when we get -EINPROGRESS from
918          * server. This is required by the new quota design. */
919         if (it && it->it_op & IT_CREAT &&
920             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
921                 mdc_clear_replay_flag(req, rc);
922                 ptlrpc_req_finished(req);
923                 resends++;
924
925                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926                        obddev->obd_name, resends, it->it_op,
927                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
928
929                 if (generation == obddev->u.cli.cl_import->imp_generation) {
930                         goto resend;
931                 } else {
932                         CDEBUG(D_HA, "resend cross eviction\n");
933                         RETURN(-EIO);
934                 }
935         }
936
937         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
938         if (rc < 0) {
939                 if (lustre_handle_is_used(lockh)) {
940                         ldlm_lock_decref(lockh, einfo->ei_mode);
941                         memset(lockh, 0, sizeof(*lockh));
942                 }
943                 ptlrpc_req_finished(req);
944         }
945         RETURN(rc);
946 }
947
948 static int mdc_finish_intent_lock(struct obd_export *exp,
949                                   struct ptlrpc_request *request,
950                                   struct md_op_data *op_data,
951                                   struct lookup_intent *it,
952                                   struct lustre_handle *lockh)
953 {
954         struct lustre_handle old_lock;
955         struct mdt_body *mdt_body;
956         struct ldlm_lock *lock;
957         int rc;
958         ENTRY;
959
960         LASSERT(request != NULL);
961         LASSERT(request != LP_POISON);
962         LASSERT(request->rq_repmsg != LP_POISON);
963
964         if (it->it_op & IT_READDIR)
965                 RETURN(0);
966
967         if (!it_disposition(it, DISP_IT_EXECD)) {
968                 /* The server failed before it even started executing the
969                  * intent, i.e. because it couldn't unpack the request. */
970                 LASSERT(it->d.lustre.it_status != 0);
971                 RETURN(it->d.lustre.it_status);
972         }
973         rc = it_open_error(DISP_IT_EXECD, it);
974         if (rc)
975                 RETURN(rc);
976
977         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
978         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
979
980         /* If we were revalidating a fid/name pair, mark the intent in
981          * case we fail and get called again from lookup */
982         if (fid_is_sane(&op_data->op_fid2) &&
983             it->it_create_mode & M_CHECK_STALE &&
984             it->it_op != IT_GETATTR) {
985                 /* Also: did we find the same inode? */
986                 /* sever can return one of two fids:
987                  * op_fid2 - new allocated fid - if file is created.
988                  * op_fid3 - existent fid - if file only open.
989                  * op_fid3 is saved in lmv_intent_open */
990                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
991                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
992                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
993                                "\n", PFID(&op_data->op_fid2),
994                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
995                         RETURN(-ESTALE);
996                 }
997         }
998
999         rc = it_open_error(DISP_LOOKUP_EXECD, it);
1000         if (rc)
1001                 RETURN(rc);
1002
1003         /* keep requests around for the multiple phases of the call
1004          * this shows the DISP_XX must guarantee we make it into the call
1005          */
1006         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1007             it_disposition(it, DISP_OPEN_CREATE) &&
1008             !it_open_error(DISP_OPEN_CREATE, it)) {
1009                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1010                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1011         }
1012         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1013             it_disposition(it, DISP_OPEN_OPEN) &&
1014             !it_open_error(DISP_OPEN_OPEN, it)) {
1015                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1016                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1017                 /* BUG 11546 - eviction in the middle of open rpc processing */
1018                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1019         }
1020
1021         if (it->it_op & IT_CREAT) {
1022                 /* XXX this belongs in ll_create_it */
1023         } else if (it->it_op == IT_OPEN) {
1024                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1025         } else {
1026                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1027         }
1028
1029         /* If we already have a matching lock, then cancel the new
1030          * one.  We have to set the data here instead of in
1031          * mdc_enqueue, because we need to use the child's inode as
1032          * the l_ast_data to match, and that's not available until
1033          * intent_finish has performed the iget().) */
1034         lock = ldlm_handle2lock(lockh);
1035         if (lock) {
1036                 ldlm_policy_data_t policy = lock->l_policy_data;
1037                 LDLM_DEBUG(lock, "matching against this");
1038
1039                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1040                                          &lock->l_resource->lr_name),
1041                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1042                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1043                 LDLM_LOCK_PUT(lock);
1044
1045                 memcpy(&old_lock, lockh, sizeof(*lockh));
1046                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1047                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1048                         ldlm_lock_decref_and_cancel(lockh,
1049                                                     it->d.lustre.it_lock_mode);
1050                         memcpy(lockh, &old_lock, sizeof(old_lock));
1051                         it->d.lustre.it_lock_handle = lockh->cookie;
1052                 }
1053         }
1054         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1055                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1056                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1057         RETURN(rc);
1058 }
1059
1060 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1061                         struct lu_fid *fid, __u64 *bits)
1062 {
1063         /* We could just return 1 immediately, but since we should only
1064          * be called in revalidate_it if we already have a lock, let's
1065          * verify that. */
1066         struct ldlm_res_id res_id;
1067         struct lustre_handle lockh;
1068         ldlm_policy_data_t policy;
1069         ldlm_mode_t mode;
1070         ENTRY;
1071
1072         if (it->d.lustre.it_lock_handle) {
1073                 lockh.cookie = it->d.lustre.it_lock_handle;
1074                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1075         } else {
1076                 fid_build_reg_res_name(fid, &res_id);
1077                 switch (it->it_op) {
1078                 case IT_GETATTR:
1079                         /* File attributes are held under multiple bits:
1080                          * nlink is under lookup lock, size and times are
1081                          * under UPDATE lock and recently we've also got
1082                          * a separate permissions lock for owner/group/acl that
1083                          * were protected by lookup lock before.
1084                          * Getattr must provide all of that information,
1085                          * so we need to ensure we have all of those locks.
1086                          * Unfortunately, if the bits are split across multiple
1087                          * locks, there's no easy way to match all of them here,
1088                          * so an extra RPC would be performed to fetch all
1089                          * of those bits at once for now. */
1090                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1091                          * but for old MDTs (< 2.4), permission is covered
1092                          * by LOOKUP lock, so it needs to match all bits here.*/
1093                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1094                                                   MDS_INODELOCK_LOOKUP |
1095                                                   MDS_INODELOCK_PERM;
1096                         break;
1097                 case IT_READDIR:
1098                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1099                         break;
1100                 case IT_LAYOUT:
1101                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1102                         break;
1103                 default:
1104                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1105                         break;
1106                 }
1107
1108                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1109                                       LDLM_IBITS, &policy,
1110                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1111                                       &lockh);
1112         }
1113
1114         if (mode) {
1115                 it->d.lustre.it_lock_handle = lockh.cookie;
1116                 it->d.lustre.it_lock_mode = mode;
1117         } else {
1118                 it->d.lustre.it_lock_handle = 0;
1119                 it->d.lustre.it_lock_mode = 0;
1120         }
1121
1122         RETURN(!!mode);
1123 }
1124
1125 /*
1126  * This long block is all about fixing up the lock and request state
1127  * so that it is correct as of the moment _before_ the operation was
1128  * applied; that way, the VFS will think that everything is normal and
1129  * call Lustre's regular VFS methods.
1130  *
1131  * If we're performing a creation, that means that unless the creation
1132  * failed with EEXIST, we should fake up a negative dentry.
1133  *
1134  * For everything else, we want to lookup to succeed.
1135  *
1136  * One additional note: if CREATE or OPEN succeeded, we add an extra
1137  * reference to the request because we need to keep it around until
1138  * ll_create/ll_open gets called.
1139  *
1140  * The server will return to us, in it_disposition, an indication of
1141  * exactly what d.lustre.it_status refers to.
1142  *
1143  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1144  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1145  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1146  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1147  * was successful.
1148  *
1149  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1150  * child lookup.
1151  */
1152 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1153                     void *lmm, int lmmsize, struct lookup_intent *it,
1154                     int lookup_flags, struct ptlrpc_request **reqp,
1155                     ldlm_blocking_callback cb_blocking,
1156                     __u64 extra_lock_flags)
1157 {
1158         struct ldlm_enqueue_info einfo = {
1159                 .ei_type        = LDLM_IBITS,
1160                 .ei_mode        = it_to_lock_mode(it),
1161                 .ei_cb_bl       = cb_blocking,
1162                 .ei_cb_cp       = ldlm_completion_ast,
1163         };
1164         struct lustre_handle lockh;
1165         int rc = 0;
1166         ENTRY;
1167         LASSERT(it);
1168
1169         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1170                 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1171                 op_data->op_name, PFID(&op_data->op_fid2),
1172                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1173                 it->it_flags);
1174
1175         lockh.cookie = 0;
1176         if (fid_is_sane(&op_data->op_fid2) &&
1177             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1178                 /* We could just return 1 immediately, but since we should only
1179                  * be called in revalidate_it if we already have a lock, let's
1180                  * verify that. */
1181                 it->d.lustre.it_lock_handle = 0;
1182                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1183                 /* Only return failure if it was not GETATTR by cfid
1184                    (from inode_revalidate) */
1185                 if (rc || op_data->op_namelen != 0)
1186                         RETURN(rc);
1187         }
1188
1189         /* For case if upper layer did not alloc fid, do it now. */
1190         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1191                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1192                 if (rc < 0) {
1193                         CERROR("Can't alloc new fid, rc %d\n", rc);
1194                         RETURN(rc);
1195                 }
1196         }
1197         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1198                          extra_lock_flags);
1199         if (rc < 0)
1200                 RETURN(rc);
1201
1202         *reqp = it->d.lustre.it_data;
1203         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1204         RETURN(rc);
1205 }
1206
1207 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1208                                               struct ptlrpc_request *req,
1209                                               void *args, int rc)
1210 {
1211         struct mdc_getattr_args  *ga = args;
1212         struct obd_export        *exp = ga->ga_exp;
1213         struct md_enqueue_info   *minfo = ga->ga_minfo;
1214         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1215         struct lookup_intent     *it;
1216         struct lustre_handle     *lockh;
1217         struct obd_device        *obddev;
1218         struct ldlm_reply        *lockrep;
1219         __u64                     flags = LDLM_FL_HAS_INTENT;
1220         ENTRY;
1221
1222         it    = &minfo->mi_it;
1223         lockh = &minfo->mi_lockh;
1224
1225         obddev = class_exp2obd(exp);
1226
1227         mdc_exit_request(&obddev->u.cli);
1228         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1229                 rc = -ETIMEDOUT;
1230
1231         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1232                                    &flags, NULL, 0, lockh, rc);
1233         if (rc < 0) {
1234                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1235                 mdc_clear_replay_flag(req, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1240         LASSERT(lockrep != NULL);
1241
1242         lockrep->lock_policy_res2 =
1243                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1244
1245         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1250         EXIT;
1251
1252 out:
1253         OBD_FREE_PTR(einfo);
1254         minfo->mi_cb(req, minfo, rc);
1255         return 0;
1256 }
1257
1258 int mdc_intent_getattr_async(struct obd_export *exp,
1259                              struct md_enqueue_info *minfo,
1260                              struct ldlm_enqueue_info *einfo)
1261 {
1262         struct md_op_data       *op_data = &minfo->mi_data;
1263         struct lookup_intent    *it = &minfo->mi_it;
1264         struct ptlrpc_request   *req;
1265         struct mdc_getattr_args *ga;
1266         struct obd_device       *obddev = class_exp2obd(exp);
1267         struct ldlm_res_id       res_id;
1268         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1269          *     for statahead currently. Consider CMD in future, such two bits
1270          *     maybe managed by different MDS, should be adjusted then. */
1271         ldlm_policy_data_t       policy = {
1272                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1273                                                          MDS_INODELOCK_UPDATE }
1274                                  };
1275         int                      rc = 0;
1276         __u64                    flags = LDLM_FL_HAS_INTENT;
1277         ENTRY;
1278
1279         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1280                 LPF64"o\n",
1281                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282                 ldlm_it2str(it->it_op), it->it_flags);
1283
1284         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285         req = mdc_intent_getattr_pack(exp, it, op_data);
1286         if (IS_ERR(req))
1287                 RETURN(PTR_ERR(req));
1288
1289         rc = mdc_enter_request(&obddev->u.cli);
1290         if (rc != 0) {
1291                 ptlrpc_req_finished(req);
1292                 RETURN(rc);
1293         }
1294
1295         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297         if (rc < 0) {
1298                 mdc_exit_request(&obddev->u.cli);
1299                 ptlrpc_req_finished(req);
1300                 RETURN(rc);
1301         }
1302
1303         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304         ga = ptlrpc_req_async_args(req);
1305         ga->ga_exp = exp;
1306         ga->ga_minfo = minfo;
1307         ga->ga_einfo = einfo;
1308
1309         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1310         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1311
1312         RETURN(0);
1313 }