Whamcloud - gitweb
LU-2675 lustre: move lustre_intent.h to lustre/include
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40 #include <obd.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export           *ga_exp;
52         struct md_enqueue_info      *ga_minfo;
53         struct ldlm_enqueue_info    *ga_einfo;
54 };
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_LEASE)) {
59                 if (phase >= DISP_OPEN_LEASE)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
100                       __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!*lockh)
110                 RETURN(0);
111
112         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137                            const struct lu_fid *fid, ldlm_type_t type,
138                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
139                            struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         ldlm_mode_t rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp,
154                       const struct lu_fid *fid,
155                       ldlm_policy_data_t *policy,
156                       ldlm_mode_t mode,
157                       ldlm_cancel_flags_t flags,
158                       void *opaque)
159 {
160         struct ldlm_res_id res_id;
161         struct obd_device *obd = class_exp2obd(exp);
162         int rc;
163
164         ENTRY;
165
166         fid_build_reg_res_name(fid, &res_id);
167         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168                                              policy, mode, flags, opaque);
169         RETURN(rc);
170 }
171
172 int mdc_null_inode(struct obd_export *exp,
173                    const struct lu_fid *fid)
174 {
175         struct ldlm_res_id res_id;
176         struct ldlm_resource *res;
177         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
178         ENTRY;
179
180         LASSERTF(ns != NULL, "no namespace passed\n");
181
182         fid_build_reg_res_name(fid, &res_id);
183
184         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185         if (IS_ERR(res))
186                 RETURN(0);
187
188         lock_res(res);
189         res->lr_lvb_inode = NULL;
190         unlock_res(res);
191
192         ldlm_resource_putref(res);
193         RETURN(0);
194 }
195
196 /* find any ldlm lock of the inode in mdc
197  * return 0    not find
198  *        1    find one
199  *      < 0    error */
200 int mdc_find_cbdata(struct obd_export *exp,
201                     const struct lu_fid *fid,
202                     ldlm_iterator_t it, void *data)
203 {
204         struct ldlm_res_id res_id;
205         int rc = 0;
206         ENTRY;
207
208         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
210                                    it, data);
211         if (rc == LDLM_ITER_STOP)
212                 RETURN(1);
213         else if (rc == LDLM_ITER_CONTINUE)
214                 RETURN(0);
215         RETURN(rc);
216 }
217
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
219 {
220         /* Don't hold error requests for replay. */
221         if (req->rq_replay) {
222                 spin_lock(&req->rq_lock);
223                 req->rq_replay = 0;
224                 spin_unlock(&req->rq_lock);
225         }
226         if (rc && req->rq_transno != 0) {
227                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
228                 LBUG();
229         }
230 }
231
232 /* Save a large LOV EA into the request buffer so that it is available
233  * for replay.  We don't do this in the initial request because the
234  * original request doesn't need this buffer (at most it sends just the
235  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236  * buffer and may also be difficult to allocate and save a very large
237  * request buffer for each open. (bug 5707)
238  *
239  * OOM here may cause recovery failure if lmm is needed (only for the
240  * original open if the MDS crashed just when this client also OOM'd)
241  * but this is incredibly unlikely, and questionable whether the client
242  * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244                                 struct mdt_body *body)
245 {
246         int     rc;
247
248         /* FIXME: remove this explicit offset. */
249         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250                                         body->mbo_eadatasize);
251         if (rc) {
252                 CERROR("Can't enlarge segment %d size to %d\n",
253                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255                 body->mbo_eadatasize = 0;
256         }
257 }
258
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261                      struct md_op_data *op_data)
262 {
263         struct ptlrpc_request   *req;
264         struct obd_device       *obddev = class_exp2obd(exp);
265         struct ldlm_intent      *lit;
266         const void              *lmm = op_data->op_data;
267         int                      lmmsize = op_data->op_data_size;
268         struct list_head         cancels = LIST_HEAD_INIT(cancels);
269         int                      count = 0;
270         int                      mode;
271         int                      rc;
272         ENTRY;
273
274         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
275
276         /* XXX: openlock is not cancelled for cross-refs. */
277         /* If inode is known, cancel conflicting OPEN locks. */
278         if (fid_is_sane(&op_data->op_fid2)) {
279                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280                         if (it->it_flags & FMODE_WRITE)
281                                 mode = LCK_EX;
282                         else
283                                 mode = LCK_PR;
284                 } else {
285                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
286                                 mode = LCK_CW;
287 #ifdef FMODE_EXEC
288                         else if (it->it_flags & FMODE_EXEC)
289                                 mode = LCK_PR;
290 #endif
291                         else
292                                 mode = LCK_CR;
293                 }
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         /* parent capability */
316         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317         /* child capability, reserve the size according to parent capa, it will
318          * be filled after we get the reply */
319         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
320
321         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322                              op_data->op_namelen + 1);
323         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
325
326         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
327         if (rc < 0) {
328                 ptlrpc_request_free(req);
329                 RETURN(ERR_PTR(rc));
330         }
331
332         spin_lock(&req->rq_lock);
333         req->rq_replay = req->rq_import->imp_replayable;
334         spin_unlock(&req->rq_lock);
335
336         /* pack the intent */
337         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338         lit->opc = (__u64)it->it_op;
339
340         /* pack the intended request */
341         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342                       lmmsize);
343
344         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
345                              obddev->u.cli.cl_max_mds_easize);
346
347         /* for remote client, fetch remote perm for current user */
348         if (client_is_remote(exp))
349                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
350                                      sizeof(struct mdt_remote_perm));
351         ptlrpc_request_set_replen(req);
352         return req;
353 }
354
355 static struct ptlrpc_request *
356 mdc_intent_getxattr_pack(struct obd_export *exp,
357                          struct lookup_intent *it,
358                          struct md_op_data *op_data)
359 {
360         struct ptlrpc_request   *req;
361         struct ldlm_intent      *lit;
362         int                     rc, count = 0, maxdata;
363         struct list_head        cancels = LIST_HEAD_INIT(cancels);
364
365         ENTRY;
366
367         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
368                                         &RQF_LDLM_INTENT_GETXATTR);
369         if (req == NULL)
370                 RETURN(ERR_PTR(-ENOMEM));
371
372         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
373
374         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 RETURN(ERR_PTR(rc));
378         }
379
380         /* pack the intent */
381         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
382         lit->opc = IT_GETXATTR;
383
384         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
385
386         /* pack the intended request */
387         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
388                         op_data->op_valid, maxdata, -1, 0);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
391                                 RCL_SERVER, maxdata);
392
393         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
394                                 RCL_SERVER, maxdata);
395
396         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
397                                 RCL_SERVER, maxdata);
398
399         ptlrpc_request_set_replen(req);
400
401         RETURN(req);
402 }
403
404 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
405                                                      struct lookup_intent *it,
406                                                      struct md_op_data *op_data)
407 {
408         struct ptlrpc_request *req;
409         struct obd_device     *obddev = class_exp2obd(exp);
410         struct ldlm_intent    *lit;
411         int                    rc;
412         ENTRY;
413
414         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
415                                    &RQF_LDLM_INTENT_UNLINK);
416         if (req == NULL)
417                 RETURN(ERR_PTR(-ENOMEM));
418
419         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
420         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
421                              op_data->op_namelen + 1);
422
423         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
424         if (rc) {
425                 ptlrpc_request_free(req);
426                 RETURN(ERR_PTR(rc));
427         }
428
429         /* pack the intent */
430         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
431         lit->opc = (__u64)it->it_op;
432
433         /* pack the intended request */
434         mdc_unlink_pack(req, op_data);
435
436         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437                              obddev->u.cli.cl_default_mds_easize);
438         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
439                              obddev->u.cli.cl_default_mds_cookiesize);
440         ptlrpc_request_set_replen(req);
441         RETURN(req);
442 }
443
444 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
445                                                       struct lookup_intent *it,
446                                                       struct md_op_data *op_data)
447 {
448         struct ptlrpc_request *req;
449         struct obd_device     *obddev = class_exp2obd(exp);
450         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
451                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
452                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
453                                        (client_is_remote(exp) ?
454                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
455         struct ldlm_intent    *lit;
456         int                    rc;
457         int                     easize;
458         ENTRY;
459
460         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461                                    &RQF_LDLM_INTENT_GETATTR);
462         if (req == NULL)
463                 RETURN(ERR_PTR(-ENOMEM));
464
465         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
466         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
467                              op_data->op_namelen + 1);
468
469         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
470         if (rc) {
471                 ptlrpc_request_free(req);
472                 RETURN(ERR_PTR(rc));
473         }
474
475         /* pack the intent */
476         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
477         lit->opc = (__u64)it->it_op;
478
479         if (obddev->u.cli.cl_default_mds_easize > 0)
480                 easize = obddev->u.cli.cl_default_mds_easize;
481         else
482                 easize = obddev->u.cli.cl_max_mds_easize;
483
484         /* pack the intended request */
485         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
486
487         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
488         if (client_is_remote(exp))
489                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
490                                      sizeof(struct mdt_remote_perm));
491         ptlrpc_request_set_replen(req);
492         RETURN(req);
493 }
494
495 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
496                                                      struct lookup_intent *it,
497                                                      struct md_op_data *unused)
498 {
499         struct obd_device     *obd = class_exp2obd(exp);
500         struct ptlrpc_request *req;
501         struct ldlm_intent    *lit;
502         struct layout_intent  *layout;
503         int rc;
504         ENTRY;
505
506         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
507                                 &RQF_LDLM_INTENT_LAYOUT);
508         if (req == NULL)
509                 RETURN(ERR_PTR(-ENOMEM));
510
511         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
512         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
513         if (rc) {
514                 ptlrpc_request_free(req);
515                 RETURN(ERR_PTR(rc));
516         }
517
518         /* pack the intent */
519         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
520         lit->opc = (__u64)it->it_op;
521
522         /* pack the layout intent request */
523         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
524         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
525          * set for replication */
526         layout->li_opc = LAYOUT_INTENT_ACCESS;
527
528         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
529                              obd->u.cli.cl_default_mds_easize);
530         ptlrpc_request_set_replen(req);
531         RETURN(req);
532 }
533
534 static struct ptlrpc_request *
535 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
536 {
537         struct ptlrpc_request *req;
538         int rc;
539         ENTRY;
540
541         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
542         if (req == NULL)
543                 RETURN(ERR_PTR(-ENOMEM));
544
545         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
546         if (rc) {
547                 ptlrpc_request_free(req);
548                 RETURN(ERR_PTR(rc));
549         }
550
551         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
552         ptlrpc_request_set_replen(req);
553         RETURN(req);
554 }
555
556 static int mdc_finish_enqueue(struct obd_export *exp,
557                               struct ptlrpc_request *req,
558                               struct ldlm_enqueue_info *einfo,
559                               struct lookup_intent *it,
560                               struct lustre_handle *lockh,
561                               int rc)
562 {
563         struct req_capsule  *pill = &req->rq_pill;
564         struct ldlm_request *lockreq;
565         struct ldlm_reply   *lockrep;
566         struct lustre_intent_data *intent = &it->d.lustre;
567         struct ldlm_lock    *lock;
568         void                *lvb_data = NULL;
569         int                  lvb_len = 0;
570         ENTRY;
571
572         LASSERT(rc >= 0);
573         /* Similarly, if we're going to replay this request, we don't want to
574          * actually get a lock, just perform the intent. */
575         if (req->rq_transno || req->rq_replay) {
576                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
577                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
578         }
579
580         if (rc == ELDLM_LOCK_ABORTED) {
581                 einfo->ei_mode = 0;
582                 memset(lockh, 0, sizeof(*lockh));
583                 rc = 0;
584         } else { /* rc = 0 */
585                 lock = ldlm_handle2lock(lockh);
586                 LASSERT(lock != NULL);
587
588                 /* If the server gave us back a different lock mode, we should
589                  * fix up our variables. */
590                 if (lock->l_req_mode != einfo->ei_mode) {
591                         ldlm_lock_addref(lockh, lock->l_req_mode);
592                         ldlm_lock_decref(lockh, einfo->ei_mode);
593                         einfo->ei_mode = lock->l_req_mode;
594                 }
595                 LDLM_LOCK_PUT(lock);
596         }
597
598         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
599         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
600
601         intent->it_disposition = (int)lockrep->lock_policy_res1;
602         intent->it_status = (int)lockrep->lock_policy_res2;
603         intent->it_lock_mode = einfo->ei_mode;
604         intent->it_lock_handle = lockh->cookie;
605         intent->it_data = req;
606
607         /* Technically speaking rq_transno must already be zero if
608          * it_status is in error, so the check is a bit redundant */
609         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
610                 mdc_clear_replay_flag(req, intent->it_status);
611
612         /* If we're doing an IT_OPEN which did not result in an actual
613          * successful open, then we need to remove the bit which saves
614          * this request for unconditional replay.
615          *
616          * It's important that we do this first!  Otherwise we might exit the
617          * function without doing so, and try to replay a failed create
618          * (bug 3440) */
619         if (it->it_op & IT_OPEN && req->rq_replay &&
620             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
621                 mdc_clear_replay_flag(req, intent->it_status);
622
623         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
624                   it->it_op, intent->it_disposition, intent->it_status);
625
626         /* We know what to expect, so we do any byte flipping required here */
627         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
628                 struct mdt_body *body;
629
630                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
631                 if (body == NULL) {
632                         CERROR ("Can't swab mdt_body\n");
633                         RETURN (-EPROTO);
634                 }
635
636                 if (it_disposition(it, DISP_OPEN_OPEN) &&
637                     !it_open_error(DISP_OPEN_OPEN, it)) {
638                         /*
639                          * If this is a successful OPEN request, we need to set
640                          * replay handler and data early, so that if replay
641                          * happens immediately after swabbing below, new reply
642                          * is swabbed by that handler correctly.
643                          */
644                         mdc_set_open_replay_data(NULL, NULL, it);
645                 }
646
647                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
648                         void *eadata;
649
650                         mdc_update_max_ea_from_body(exp, body);
651
652                         /*
653                          * The eadata is opaque; just check that it is there.
654                          * Eventually, obd_unpackmd() will check the contents.
655                          */
656                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
657                                                         body->mbo_eadatasize);
658                         if (eadata == NULL)
659                                 RETURN(-EPROTO);
660
661                         /* save lvb data and length in case this is for layout
662                          * lock */
663                         lvb_data = eadata;
664                         lvb_len = body->mbo_eadatasize;
665
666                         /*
667                          * We save the reply LOV EA in case we have to replay a
668                          * create for recovery.  If we didn't allocate a large
669                          * enough request buffer above we need to reallocate it
670                          * here to hold the actual LOV EA.
671                          *
672                          * To not save LOV EA if request is not going to replay
673                          * (for example error one).
674                          */
675                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
676                                 void *lmm;
677                                 if (req_capsule_get_size(pill, &RMF_EADATA,
678                                                          RCL_CLIENT) <
679                                     body->mbo_eadatasize)
680                                         mdc_realloc_openmsg(req, body);
681                                 else
682                                         req_capsule_shrink(pill, &RMF_EADATA,
683                                                            body->mbo_eadatasize,
684                                                            RCL_CLIENT);
685
686                                 req_capsule_set_size(pill, &RMF_EADATA,
687                                                      RCL_CLIENT,
688                                                      body->mbo_eadatasize);
689
690                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
691                                 if (lmm)
692                                         memcpy(lmm, eadata,
693                                                body->mbo_eadatasize);
694                         }
695                 }
696
697                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
698                         struct mdt_remote_perm *perm;
699
700                         LASSERT(client_is_remote(exp));
701                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
702                                                 lustre_swab_mdt_remote_perm);
703                         if (perm == NULL)
704                                 RETURN(-EPROTO);
705                 }
706                 if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
707                         struct lustre_capa *capa, *p;
708
709                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
710                         if (capa == NULL)
711                                 RETURN(-EPROTO);
712
713                         if (it->it_op & IT_OPEN) {
714                                 /* client fid capa will be checked in replay */
715                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
716                                 LASSERT(p);
717                                 *p = *capa;
718                         }
719                 }
720                 if (body->mbo_valid & OBD_MD_FLOSSCAPA) {
721                         struct lustre_capa *capa;
722
723                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
724                         if (capa == NULL)
725                                 RETURN(-EPROTO);
726                 }
727         } else if (it->it_op & IT_LAYOUT) {
728                 /* maybe the lock was granted right away and layout
729                  * is packed into RMF_DLM_LVB of req */
730                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
731                 if (lvb_len > 0) {
732                         lvb_data = req_capsule_server_sized_get(pill,
733                                                         &RMF_DLM_LVB, lvb_len);
734                         if (lvb_data == NULL)
735                                 RETURN(-EPROTO);
736                 }
737         }
738
739         /* fill in stripe data for layout lock */
740         lock = ldlm_handle2lock(lockh);
741         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
742                 void *lmm;
743
744                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
745                         ldlm_it2str(it->it_op), lvb_len);
746
747                 OBD_ALLOC_LARGE(lmm, lvb_len);
748                 if (lmm == NULL) {
749                         LDLM_LOCK_PUT(lock);
750                         RETURN(-ENOMEM);
751                 }
752                 memcpy(lmm, lvb_data, lvb_len);
753
754                 /* install lvb_data */
755                 lock_res_and_lock(lock);
756                 if (lock->l_lvb_data == NULL) {
757                         lock->l_lvb_type = LVB_T_LAYOUT;
758                         lock->l_lvb_data = lmm;
759                         lock->l_lvb_len = lvb_len;
760                         lmm = NULL;
761                 }
762                 unlock_res_and_lock(lock);
763                 if (lmm != NULL)
764                         OBD_FREE_LARGE(lmm, lvb_len);
765         }
766         if (lock != NULL)
767                 LDLM_LOCK_PUT(lock);
768
769         RETURN(rc);
770 }
771
772 /* We always reserve enough space in the reply packet for a stripe MD, because
773  * we don't know in advance the file type. */
774 int mdc_enqueue(struct obd_export *exp,
775                 struct ldlm_enqueue_info *einfo,
776                 const union ldlm_policy_data *policy,
777                 struct lookup_intent *it, struct md_op_data *op_data,
778                 struct lustre_handle *lockh, __u64 extra_lock_flags)
779 {
780         struct obd_device     *obddev = class_exp2obd(exp);
781         struct ptlrpc_request *req = NULL;
782         __u64                  flags, saved_flags = extra_lock_flags;
783         int                    rc;
784         struct ldlm_res_id res_id;
785         static const ldlm_policy_data_t lookup_policy =
786                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
787         static const ldlm_policy_data_t update_policy =
788                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
789         static const ldlm_policy_data_t layout_policy =
790                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
791         static const ldlm_policy_data_t getxattr_policy = {
792                               .l_inodebits = { MDS_INODELOCK_XATTR } };
793         int                    generation, resends = 0;
794         struct ldlm_reply     *lockrep;
795         enum lvb_type          lvb_type = 0;
796         ENTRY;
797
798         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
799                  einfo->ei_type);
800         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
801
802         if (it != NULL) {
803                 LASSERT(policy == NULL);
804
805                 saved_flags |= LDLM_FL_HAS_INTENT;
806                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
807                         policy = &update_policy;
808                 else if (it->it_op & IT_LAYOUT)
809                         policy = &layout_policy;
810                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
811                         policy = &getxattr_policy;
812                 else
813                         policy = &lookup_policy;
814         }
815
816         generation = obddev->u.cli.cl_import->imp_generation;
817 resend:
818         flags = saved_flags;
819         if (it == NULL) {
820                 /* The only way right now is FLOCK. */
821                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
822                          einfo->ei_type);
823                 res_id.name[3] = LDLM_FLOCK;
824         } else if (it->it_op & IT_OPEN) {
825                 req = mdc_intent_open_pack(exp, it, op_data);
826         } else if (it->it_op & IT_UNLINK) {
827                 req = mdc_intent_unlink_pack(exp, it, op_data);
828         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
829                 req = mdc_intent_getattr_pack(exp, it, op_data);
830         } else if (it->it_op & IT_READDIR) {
831                 req = mdc_enqueue_pack(exp, 0);
832         } else if (it->it_op & IT_LAYOUT) {
833                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
834                         RETURN(-EOPNOTSUPP);
835                 req = mdc_intent_layout_pack(exp, it, op_data);
836                 lvb_type = LVB_T_LAYOUT;
837         } else if (it->it_op & IT_GETXATTR) {
838                 req = mdc_intent_getxattr_pack(exp, it, op_data);
839         } else {
840                 LBUG();
841                 RETURN(-EINVAL);
842         }
843
844         if (IS_ERR(req))
845                 RETURN(PTR_ERR(req));
846
847         if (req != NULL && it && it->it_op & IT_CREAT)
848                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
849                  * retry logic */
850                 req->rq_no_retry_einprogress = 1;
851
852         if (resends) {
853                 req->rq_generation_set = 1;
854                 req->rq_import_generation = generation;
855                 req->rq_sent = cfs_time_current_sec() + resends;
856         }
857
858         /* It is important to obtain rpc_lock first (if applicable), so that
859          * threads that are serialised with rpc_lock are not polluting our
860          * rpcs in flight counter. We do not do flock request limiting, though*/
861         if (it) {
862                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
863                 rc = obd_get_request_slot(&obddev->u.cli);
864                 if (rc != 0) {
865                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
866                         mdc_clear_replay_flag(req, 0);
867                         ptlrpc_req_finished(req);
868                         RETURN(rc);
869                 }
870         }
871
872         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
873                               0, lvb_type, lockh, 0);
874         if (!it) {
875                 /* For flock requests we immediatelly return without further
876                    delay and let caller deal with the rest, since rest of
877                    this function metadata processing makes no sense for flock
878                    requests anyway. But in case of problem during comms with
879                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
880                    can not rely on caller and this mainly for F_UNLCKs
881                    (explicits or automatically generated by Kernel to clean
882                    current FLocks upon exit) that can't be trashed */
883                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
884                     (einfo->ei_type == LDLM_FLOCK) &&
885                     (einfo->ei_mode == LCK_NL))
886                         goto resend;
887                 RETURN(rc);
888         }
889
890         obd_put_request_slot(&obddev->u.cli);
891         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
892
893         if (rc < 0) {
894                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
895                        obddev->obd_name, rc);
896
897                 mdc_clear_replay_flag(req, rc);
898                 ptlrpc_req_finished(req);
899                 RETURN(rc);
900         }
901
902         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
903         LASSERT(lockrep != NULL);
904
905         lockrep->lock_policy_res2 =
906                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
907
908         /* Retry the create infinitely when we get -EINPROGRESS from
909          * server. This is required by the new quota design. */
910         if (it && it->it_op & IT_CREAT &&
911             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
912                 mdc_clear_replay_flag(req, rc);
913                 ptlrpc_req_finished(req);
914                 resends++;
915
916                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
917                        obddev->obd_name, resends, it->it_op,
918                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
919
920                 if (generation == obddev->u.cli.cl_import->imp_generation) {
921                         goto resend;
922                 } else {
923                         CDEBUG(D_HA, "resend cross eviction\n");
924                         RETURN(-EIO);
925                 }
926         }
927
928         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
929         if (rc < 0) {
930                 if (lustre_handle_is_used(lockh)) {
931                         ldlm_lock_decref(lockh, einfo->ei_mode);
932                         memset(lockh, 0, sizeof(*lockh));
933                 }
934                 ptlrpc_req_finished(req);
935
936                 it->d.lustre.it_lock_handle = 0;
937                 it->d.lustre.it_lock_mode = 0;
938                 it->d.lustre.it_data = NULL;
939         }
940
941         RETURN(rc);
942 }
943
944 static int mdc_finish_intent_lock(struct obd_export *exp,
945                                   struct ptlrpc_request *request,
946                                   struct md_op_data *op_data,
947                                   struct lookup_intent *it,
948                                   struct lustre_handle *lockh)
949 {
950         struct lustre_handle old_lock;
951         struct mdt_body *mdt_body;
952         struct ldlm_lock *lock;
953         int rc;
954         ENTRY;
955
956         LASSERT(request != NULL);
957         LASSERT(request != LP_POISON);
958         LASSERT(request->rq_repmsg != LP_POISON);
959
960         if (it->it_op & IT_READDIR)
961                 RETURN(0);
962
963         if (!it_disposition(it, DISP_IT_EXECD)) {
964                 /* The server failed before it even started executing the
965                  * intent, i.e. because it couldn't unpack the request. */
966                 LASSERT(it->d.lustre.it_status != 0);
967                 RETURN(it->d.lustre.it_status);
968         }
969         rc = it_open_error(DISP_IT_EXECD, it);
970         if (rc)
971                 RETURN(rc);
972
973         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
974         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
975
976         rc = it_open_error(DISP_LOOKUP_EXECD, it);
977         if (rc)
978                 RETURN(rc);
979
980         /* keep requests around for the multiple phases of the call
981          * this shows the DISP_XX must guarantee we make it into the call
982          */
983         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
984             it_disposition(it, DISP_OPEN_CREATE) &&
985             !it_open_error(DISP_OPEN_CREATE, it)) {
986                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
987                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
988         }
989         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
990             it_disposition(it, DISP_OPEN_OPEN) &&
991             !it_open_error(DISP_OPEN_OPEN, it)) {
992                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
993                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
994                 /* BUG 11546 - eviction in the middle of open rpc processing */
995                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
996         }
997
998         if (it->it_op & IT_CREAT) {
999                 /* XXX this belongs in ll_create_it */
1000         } else if (it->it_op == IT_OPEN) {
1001                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1002         } else {
1003                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1004         }
1005
1006         /* If we already have a matching lock, then cancel the new
1007          * one.  We have to set the data here instead of in
1008          * mdc_enqueue, because we need to use the child's inode as
1009          * the l_ast_data to match, and that's not available until
1010          * intent_finish has performed the iget().) */
1011         lock = ldlm_handle2lock(lockh);
1012         if (lock) {
1013                 ldlm_policy_data_t policy = lock->l_policy_data;
1014                 LDLM_DEBUG(lock, "matching against this");
1015
1016                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
1017                                          &lock->l_resource->lr_name),
1018                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1019                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
1020                 LDLM_LOCK_PUT(lock);
1021
1022                 memcpy(&old_lock, lockh, sizeof(*lockh));
1023                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1024                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1025                         ldlm_lock_decref_and_cancel(lockh,
1026                                                     it->d.lustre.it_lock_mode);
1027                         memcpy(lockh, &old_lock, sizeof(old_lock));
1028                         it->d.lustre.it_lock_handle = lockh->cookie;
1029                 }
1030         }
1031         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1032                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1033                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1034         RETURN(rc);
1035 }
1036
1037 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1038                         struct lu_fid *fid, __u64 *bits)
1039 {
1040         /* We could just return 1 immediately, but since we should only
1041          * be called in revalidate_it if we already have a lock, let's
1042          * verify that. */
1043         struct ldlm_res_id res_id;
1044         struct lustre_handle lockh;
1045         ldlm_policy_data_t policy;
1046         ldlm_mode_t mode;
1047         ENTRY;
1048
1049         if (it->d.lustre.it_lock_handle) {
1050                 lockh.cookie = it->d.lustre.it_lock_handle;
1051                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1052         } else {
1053                 fid_build_reg_res_name(fid, &res_id);
1054                 switch (it->it_op) {
1055                 case IT_GETATTR:
1056                         /* File attributes are held under multiple bits:
1057                          * nlink is under lookup lock, size and times are
1058                          * under UPDATE lock and recently we've also got
1059                          * a separate permissions lock for owner/group/acl that
1060                          * were protected by lookup lock before.
1061                          * Getattr must provide all of that information,
1062                          * so we need to ensure we have all of those locks.
1063                          * Unfortunately, if the bits are split across multiple
1064                          * locks, there's no easy way to match all of them here,
1065                          * so an extra RPC would be performed to fetch all
1066                          * of those bits at once for now. */
1067                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1068                          * but for old MDTs (< 2.4), permission is covered
1069                          * by LOOKUP lock, so it needs to match all bits here.*/
1070                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1071                                                   MDS_INODELOCK_LOOKUP |
1072                                                   MDS_INODELOCK_PERM;
1073                         break;
1074                 case IT_READDIR:
1075                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1076                         break;
1077                 case IT_LAYOUT:
1078                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1079                         break;
1080                 default:
1081                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1082                         break;
1083                 }
1084
1085                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1086                                       LDLM_IBITS, &policy,
1087                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1088                                       &lockh);
1089         }
1090
1091         if (mode) {
1092                 it->d.lustre.it_lock_handle = lockh.cookie;
1093                 it->d.lustre.it_lock_mode = mode;
1094         } else {
1095                 it->d.lustre.it_lock_handle = 0;
1096                 it->d.lustre.it_lock_mode = 0;
1097         }
1098
1099         RETURN(!!mode);
1100 }
1101
1102 /*
1103  * This long block is all about fixing up the lock and request state
1104  * so that it is correct as of the moment _before_ the operation was
1105  * applied; that way, the VFS will think that everything is normal and
1106  * call Lustre's regular VFS methods.
1107  *
1108  * If we're performing a creation, that means that unless the creation
1109  * failed with EEXIST, we should fake up a negative dentry.
1110  *
1111  * For everything else, we want to lookup to succeed.
1112  *
1113  * One additional note: if CREATE or OPEN succeeded, we add an extra
1114  * reference to the request because we need to keep it around until
1115  * ll_create/ll_open gets called.
1116  *
1117  * The server will return to us, in it_disposition, an indication of
1118  * exactly what d.lustre.it_status refers to.
1119  *
1120  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1121  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1122  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1123  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1124  * was successful.
1125  *
1126  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1127  * child lookup.
1128  */
1129 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1130                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1131                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1132 {
1133         struct ldlm_enqueue_info einfo = {
1134                 .ei_type        = LDLM_IBITS,
1135                 .ei_mode        = it_to_lock_mode(it),
1136                 .ei_cb_bl       = cb_blocking,
1137                 .ei_cb_cp       = ldlm_completion_ast,
1138         };
1139         struct lustre_handle lockh;
1140         int rc = 0;
1141         ENTRY;
1142         LASSERT(it);
1143
1144         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1145                 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1146                 op_data->op_name, PFID(&op_data->op_fid2),
1147                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1148                 it->it_flags);
1149
1150         lockh.cookie = 0;
1151         if (fid_is_sane(&op_data->op_fid2) &&
1152             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1153                 /* We could just return 1 immediately, but since we should only
1154                  * be called in revalidate_it if we already have a lock, let's
1155                  * verify that. */
1156                 it->d.lustre.it_lock_handle = 0;
1157                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1158                 /* Only return failure if it was not GETATTR by cfid
1159                    (from inode_revalidate) */
1160                 if (rc || op_data->op_namelen != 0)
1161                         RETURN(rc);
1162         }
1163
1164         /* For case if upper layer did not alloc fid, do it now. */
1165         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1166                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1167                 if (rc < 0) {
1168                         CERROR("Can't alloc new fid, rc %d\n", rc);
1169                         RETURN(rc);
1170                 }
1171         }
1172
1173         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1174                          extra_lock_flags);
1175         if (rc < 0)
1176                 RETURN(rc);
1177
1178         *reqp = it->d.lustre.it_data;
1179         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1180         RETURN(rc);
1181 }
1182
1183 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1184                                               struct ptlrpc_request *req,
1185                                               void *args, int rc)
1186 {
1187         struct mdc_getattr_args  *ga = args;
1188         struct obd_export        *exp = ga->ga_exp;
1189         struct md_enqueue_info   *minfo = ga->ga_minfo;
1190         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1191         struct lookup_intent     *it;
1192         struct lustre_handle     *lockh;
1193         struct obd_device        *obddev;
1194         struct ldlm_reply        *lockrep;
1195         __u64                     flags = LDLM_FL_HAS_INTENT;
1196         ENTRY;
1197
1198         it    = &minfo->mi_it;
1199         lockh = &minfo->mi_lockh;
1200
1201         obddev = class_exp2obd(exp);
1202
1203         obd_put_request_slot(&obddev->u.cli);
1204         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1205                 rc = -ETIMEDOUT;
1206
1207         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1208                                    &flags, NULL, 0, lockh, rc);
1209         if (rc < 0) {
1210                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1211                 mdc_clear_replay_flag(req, rc);
1212                 GOTO(out, rc);
1213         }
1214
1215         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1216         LASSERT(lockrep != NULL);
1217
1218         lockrep->lock_policy_res2 =
1219                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1220
1221         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1222         if (rc)
1223                 GOTO(out, rc);
1224
1225         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1226         EXIT;
1227
1228 out:
1229         OBD_FREE_PTR(einfo);
1230         minfo->mi_cb(req, minfo, rc);
1231         return 0;
1232 }
1233
1234 int mdc_intent_getattr_async(struct obd_export *exp,
1235                              struct md_enqueue_info *minfo,
1236                              struct ldlm_enqueue_info *einfo)
1237 {
1238         struct md_op_data       *op_data = &minfo->mi_data;
1239         struct lookup_intent    *it = &minfo->mi_it;
1240         struct ptlrpc_request   *req;
1241         struct mdc_getattr_args *ga;
1242         struct obd_device       *obddev = class_exp2obd(exp);
1243         struct ldlm_res_id       res_id;
1244         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1245          *     for statahead currently. Consider CMD in future, such two bits
1246          *     maybe managed by different MDS, should be adjusted then. */
1247         ldlm_policy_data_t       policy = {
1248                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1249                                                          MDS_INODELOCK_UPDATE }
1250                                  };
1251         int                      rc = 0;
1252         __u64                    flags = LDLM_FL_HAS_INTENT;
1253         ENTRY;
1254
1255         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1256                 LPF64"o\n",
1257                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1258                 ldlm_it2str(it->it_op), it->it_flags);
1259
1260         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1261         req = mdc_intent_getattr_pack(exp, it, op_data);
1262         if (IS_ERR(req))
1263                 RETURN(PTR_ERR(req));
1264
1265         rc = obd_get_request_slot(&obddev->u.cli);
1266         if (rc != 0) {
1267                 ptlrpc_req_finished(req);
1268                 RETURN(rc);
1269         }
1270
1271         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1272                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1273         if (rc < 0) {
1274                 obd_put_request_slot(&obddev->u.cli);
1275                 ptlrpc_req_finished(req);
1276                 RETURN(rc);
1277         }
1278
1279         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1280         ga = ptlrpc_req_async_args(req);
1281         ga->ga_exp = exp;
1282         ga->ga_minfo = minfo;
1283         ga->ga_einfo = einfo;
1284
1285         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1286         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1287
1288         RETURN(0);
1289 }