Whamcloud - gitweb
LU-6971 cleanup: not support remote client anymore
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
50
51 #include "mdc_internal.h"
52
53 struct mdc_getattr_args {
54         struct obd_export               *ga_exp;
55         struct md_enqueue_info          *ga_minfo;
56 };
57
58 int it_open_error(int phase, struct lookup_intent *it)
59 {
60         if (it_disposition(it, DISP_OPEN_LEASE)) {
61                 if (phase >= DISP_OPEN_LEASE)
62                         return it->it_status;
63                 else
64                         return 0;
65         }
66         if (it_disposition(it, DISP_OPEN_OPEN)) {
67                 if (phase >= DISP_OPEN_OPEN)
68                         return it->it_status;
69                 else
70                         return 0;
71         }
72
73         if (it_disposition(it, DISP_OPEN_CREATE)) {
74                 if (phase >= DISP_OPEN_CREATE)
75                         return it->it_status;
76                 else
77                         return 0;
78         }
79
80         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81                 if (phase >= DISP_LOOKUP_EXECD)
82                         return it->it_status;
83                 else
84                         return 0;
85         }
86
87         if (it_disposition(it, DISP_IT_EXECD)) {
88                 if (phase >= DISP_IT_EXECD)
89                         return it->it_status;
90                 else
91                         return 0;
92         }
93
94         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95         LBUG();
96
97         return 0;
98 }
99 EXPORT_SYMBOL(it_open_error);
100
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
103                       __u64 *bits)
104 {
105         struct ldlm_lock *lock;
106         struct inode *new_inode = data;
107         ENTRY;
108
109         if(bits)
110                 *bits = 0;
111
112         if (!*lockh)
113                 RETURN(0);
114
115         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
116
117         LASSERT(lock != NULL);
118         lock_res_and_lock(lock);
119         if (lock->l_resource->lr_lvb_inode &&
120             lock->l_resource->lr_lvb_inode != data) {
121                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
122                 LASSERTF(old_inode->i_state & I_FREEING,
123                          "Found existing inode %p/%lu/%u state %lu in lock: "
124                          "setting data to %p/%lu/%u\n", old_inode,
125                          old_inode->i_ino, old_inode->i_generation,
126                          old_inode->i_state,
127                          new_inode, new_inode->i_ino, new_inode->i_generation);
128         }
129         lock->l_resource->lr_lvb_inode = new_inode;
130         if (bits)
131                 *bits = lock->l_policy_data.l_inodebits.bits;
132
133         unlock_res_and_lock(lock);
134         LDLM_LOCK_PUT(lock);
135
136         RETURN(0);
137 }
138
139 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
140                               const struct lu_fid *fid, enum ldlm_type type,
141                               union ldlm_policy_data *policy,
142                               enum ldlm_mode mode, struct lustre_handle *lockh)
143 {
144         struct ldlm_res_id res_id;
145         enum ldlm_mode rc;
146         ENTRY;
147
148         fid_build_reg_res_name(fid, &res_id);
149         /* LU-4405: Clear bits not supported by server */
150         policy->l_inodebits.bits &= exp_connect_ibits(exp);
151         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
152                              &res_id, type, policy, mode, lockh, 0);
153         RETURN(rc);
154 }
155
156 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
157                       union ldlm_policy_data *policy, enum ldlm_mode mode,
158                       enum ldlm_cancel_flags flags, void *opaque)
159 {
160         struct obd_device *obd = class_exp2obd(exp);
161         struct ldlm_res_id res_id;
162         int rc;
163
164         ENTRY;
165
166         fid_build_reg_res_name(fid, &res_id);
167         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168                                              policy, mode, flags, opaque);
169         RETURN(rc);
170 }
171
172 int mdc_null_inode(struct obd_export *exp,
173                    const struct lu_fid *fid)
174 {
175         struct ldlm_res_id res_id;
176         struct ldlm_resource *res;
177         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
178         ENTRY;
179
180         LASSERTF(ns != NULL, "no namespace passed\n");
181
182         fid_build_reg_res_name(fid, &res_id);
183
184         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185         if (IS_ERR(res))
186                 RETURN(0);
187
188         lock_res(res);
189         res->lr_lvb_inode = NULL;
190         unlock_res(res);
191
192         ldlm_resource_putref(res);
193         RETURN(0);
194 }
195
196 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
197 {
198         /* Don't hold error requests for replay. */
199         if (req->rq_replay) {
200                 spin_lock(&req->rq_lock);
201                 req->rq_replay = 0;
202                 spin_unlock(&req->rq_lock);
203         }
204         if (rc && req->rq_transno != 0) {
205                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206                 LBUG();
207         }
208 }
209
210 /* Save a large LOV EA into the request buffer so that it is available
211  * for replay.  We don't do this in the initial request because the
212  * original request doesn't need this buffer (at most it sends just the
213  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
214  * buffer and may also be difficult to allocate and save a very large
215  * request buffer for each open. (bug 5707)
216  *
217  * OOM here may cause recovery failure if lmm is needed (only for the
218  * original open if the MDS crashed just when this client also OOM'd)
219  * but this is incredibly unlikely, and questionable whether the client
220  * could do MDS recovery under OOM anyways... */
221 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
222                                 struct mdt_body *body)
223 {
224         int     rc;
225
226         /* FIXME: remove this explicit offset. */
227         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228                                         body->mbo_eadatasize);
229         if (rc) {
230                 CERROR("Can't enlarge segment %d size to %d\n",
231                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
232                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
233                 body->mbo_eadatasize = 0;
234         }
235 }
236
237 static struct ptlrpc_request *
238 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
239                      struct md_op_data *op_data)
240 {
241         struct ptlrpc_request   *req;
242         struct obd_device       *obddev = class_exp2obd(exp);
243         struct ldlm_intent      *lit;
244         const void              *lmm = op_data->op_data;
245         __u32                    lmmsize = op_data->op_data_size;
246         struct list_head         cancels = LIST_HEAD_INIT(cancels);
247         int                      count = 0;
248         enum ldlm_mode           mode;
249         int                      rc;
250         ENTRY;
251
252         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
253
254         /* XXX: openlock is not cancelled for cross-refs. */
255         /* If inode is known, cancel conflicting OPEN locks. */
256         if (fid_is_sane(&op_data->op_fid2)) {
257                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
258                         if (it->it_flags & FMODE_WRITE)
259                                 mode = LCK_EX;
260                         else
261                                 mode = LCK_PR;
262                 } else {
263                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
264                                 mode = LCK_CW;
265 #ifdef FMODE_EXEC
266                         else if (it->it_flags & FMODE_EXEC)
267                                 mode = LCK_PR;
268 #endif
269                         else
270                                 mode = LCK_CR;
271                 }
272                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
273                                                 &cancels, mode,
274                                                 MDS_INODELOCK_OPEN);
275         }
276
277         /* If CREATE, cancel parent's UPDATE lock. */
278         if (it->it_op & IT_CREAT)
279                 mode = LCK_EX;
280         else
281                 mode = LCK_CR;
282         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
283                                          &cancels, mode,
284                                          MDS_INODELOCK_UPDATE);
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287                                    &RQF_LDLM_INTENT_OPEN);
288         if (req == NULL) {
289                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292
293         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
294                              op_data->op_namelen + 1);
295         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
296                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297
298         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
299         if (rc < 0) {
300                 ptlrpc_request_free(req);
301                 RETURN(ERR_PTR(rc));
302         }
303
304         spin_lock(&req->rq_lock);
305         req->rq_replay = req->rq_import->imp_replayable;
306         spin_unlock(&req->rq_lock);
307
308         /* pack the intent */
309         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
310         lit->opc = (__u64)it->it_op;
311
312         /* pack the intended request */
313         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
314                       lmmsize);
315
316         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
317                              obddev->u.cli.cl_max_mds_easize);
318         ptlrpc_request_set_replen(req);
319         return req;
320 }
321
322 static struct ptlrpc_request *
323 mdc_intent_getxattr_pack(struct obd_export *exp,
324                          struct lookup_intent *it,
325                          struct md_op_data *op_data)
326 {
327         struct ptlrpc_request   *req;
328         struct ldlm_intent      *lit;
329         int                     rc, count = 0;
330         __u32                   maxdata;
331         struct list_head        cancels = LIST_HEAD_INIT(cancels);
332
333         ENTRY;
334
335         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
336                                         &RQF_LDLM_INTENT_GETXATTR);
337         if (req == NULL)
338                 RETURN(ERR_PTR(-ENOMEM));
339
340         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341         if (rc) {
342                 ptlrpc_request_free(req);
343                 RETURN(ERR_PTR(rc));
344         }
345
346         /* pack the intent */
347         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348         lit->opc = IT_GETXATTR;
349
350         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
351
352         /* pack the intended request */
353         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
354                       0);
355
356         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
357                                 RCL_SERVER, maxdata);
358
359         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
360                                 RCL_SERVER, maxdata);
361
362         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
363                                 RCL_SERVER, maxdata);
364
365         ptlrpc_request_set_replen(req);
366
367         RETURN(req);
368 }
369
370 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
371                                                      struct lookup_intent *it,
372                                                      struct md_op_data *op_data)
373 {
374         struct ptlrpc_request *req;
375         struct obd_device     *obddev = class_exp2obd(exp);
376         struct ldlm_intent    *lit;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
381                                    &RQF_LDLM_INTENT_UNLINK);
382         if (req == NULL)
383                 RETURN(ERR_PTR(-ENOMEM));
384
385         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
386                              op_data->op_namelen + 1);
387
388         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
389         if (rc) {
390                 ptlrpc_request_free(req);
391                 RETURN(ERR_PTR(rc));
392         }
393
394         /* pack the intent */
395         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
396         lit->opc = (__u64)it->it_op;
397
398         /* pack the intended request */
399         mdc_unlink_pack(req, op_data);
400
401         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
402                              obddev->u.cli.cl_default_mds_easize);
403         ptlrpc_request_set_replen(req);
404         RETURN(req);
405 }
406
407 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
408                                                       struct lookup_intent *it,
409                                                       struct md_op_data *op_data)
410 {
411         struct ptlrpc_request   *req;
412         struct obd_device       *obddev = class_exp2obd(exp);
413         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
414                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
415                                          OBD_MD_MEA | OBD_MD_FLACL;
416         struct ldlm_intent      *lit;
417         int                      rc;
418         __u32                    easize;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422                                    &RQF_LDLM_INTENT_GETATTR);
423         if (req == NULL)
424                 RETURN(ERR_PTR(-ENOMEM));
425
426         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
427                              op_data->op_namelen + 1);
428
429         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430         if (rc) {
431                 ptlrpc_request_free(req);
432                 RETURN(ERR_PTR(rc));
433         }
434
435         /* pack the intent */
436         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
437         lit->opc = (__u64)it->it_op;
438
439         if (obddev->u.cli.cl_default_mds_easize > 0)
440                 easize = obddev->u.cli.cl_default_mds_easize;
441         else
442                 easize = obddev->u.cli.cl_max_mds_easize;
443
444         /* pack the intended request */
445         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
446
447         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
448         ptlrpc_request_set_replen(req);
449         RETURN(req);
450 }
451
452 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
453                                                      struct lookup_intent *it,
454                                                      struct md_op_data *unused)
455 {
456         struct obd_device     *obd = class_exp2obd(exp);
457         struct ptlrpc_request *req;
458         struct ldlm_intent    *lit;
459         struct layout_intent  *layout;
460         int rc;
461         ENTRY;
462
463         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
464                                 &RQF_LDLM_INTENT_LAYOUT);
465         if (req == NULL)
466                 RETURN(ERR_PTR(-ENOMEM));
467
468         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
469         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
470         if (rc) {
471                 ptlrpc_request_free(req);
472                 RETURN(ERR_PTR(rc));
473         }
474
475         /* pack the intent */
476         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
477         lit->opc = (__u64)it->it_op;
478
479         /* pack the layout intent request */
480         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
481         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
482          * set for replication */
483         layout->li_opc = LAYOUT_INTENT_ACCESS;
484
485         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
486                              obd->u.cli.cl_default_mds_easize);
487         ptlrpc_request_set_replen(req);
488         RETURN(req);
489 }
490
491 static struct ptlrpc_request *
492 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
493 {
494         struct ptlrpc_request *req;
495         int rc;
496         ENTRY;
497
498         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
499         if (req == NULL)
500                 RETURN(ERR_PTR(-ENOMEM));
501
502         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
503         if (rc) {
504                 ptlrpc_request_free(req);
505                 RETURN(ERR_PTR(rc));
506         }
507
508         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
509         ptlrpc_request_set_replen(req);
510         RETURN(req);
511 }
512
513 static int mdc_finish_enqueue(struct obd_export *exp,
514                               struct ptlrpc_request *req,
515                               struct ldlm_enqueue_info *einfo,
516                               struct lookup_intent *it,
517                               struct lustre_handle *lockh,
518                               int rc)
519 {
520         struct req_capsule  *pill = &req->rq_pill;
521         struct ldlm_request *lockreq;
522         struct ldlm_reply   *lockrep;
523         struct ldlm_lock    *lock;
524         void                *lvb_data = NULL;
525         __u32                lvb_len = 0;
526         ENTRY;
527
528         LASSERT(rc >= 0);
529         /* Similarly, if we're going to replay this request, we don't want to
530          * actually get a lock, just perform the intent. */
531         if (req->rq_transno || req->rq_replay) {
532                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
533                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
534         }
535
536         if (rc == ELDLM_LOCK_ABORTED) {
537                 einfo->ei_mode = 0;
538                 memset(lockh, 0, sizeof(*lockh));
539                 rc = 0;
540         } else { /* rc = 0 */
541                 lock = ldlm_handle2lock(lockh);
542                 LASSERT(lock != NULL);
543
544                 /* If the server gave us back a different lock mode, we should
545                  * fix up our variables. */
546                 if (lock->l_req_mode != einfo->ei_mode) {
547                         ldlm_lock_addref(lockh, lock->l_req_mode);
548                         ldlm_lock_decref(lockh, einfo->ei_mode);
549                         einfo->ei_mode = lock->l_req_mode;
550                 }
551                 LDLM_LOCK_PUT(lock);
552         }
553
554         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
555         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
556
557         it->it_disposition = (int)lockrep->lock_policy_res1;
558         it->it_status = (int)lockrep->lock_policy_res2;
559         it->it_lock_mode = einfo->ei_mode;
560         it->it_lock_handle = lockh->cookie;
561         it->it_request = req;
562
563         /* Technically speaking rq_transno must already be zero if
564          * it_status is in error, so the check is a bit redundant */
565         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
566                 mdc_clear_replay_flag(req, it->it_status);
567
568         /* If we're doing an IT_OPEN which did not result in an actual
569          * successful open, then we need to remove the bit which saves
570          * this request for unconditional replay.
571          *
572          * It's important that we do this first!  Otherwise we might exit the
573          * function without doing so, and try to replay a failed create
574          * (bug 3440) */
575         if (it->it_op & IT_OPEN && req->rq_replay &&
576             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
577                 mdc_clear_replay_flag(req, it->it_status);
578
579         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
580                   it->it_op, it->it_disposition, it->it_status);
581
582         /* We know what to expect, so we do any byte flipping required here */
583         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
584                 struct mdt_body *body;
585
586                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
587                 if (body == NULL) {
588                         CERROR ("Can't swab mdt_body\n");
589                         RETURN (-EPROTO);
590                 }
591
592                 if (it_disposition(it, DISP_OPEN_OPEN) &&
593                     !it_open_error(DISP_OPEN_OPEN, it)) {
594                         /*
595                          * If this is a successful OPEN request, we need to set
596                          * replay handler and data early, so that if replay
597                          * happens immediately after swabbing below, new reply
598                          * is swabbed by that handler correctly.
599                          */
600                         mdc_set_open_replay_data(NULL, NULL, it);
601                 }
602
603                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
604                         void *eadata;
605
606                         mdc_update_max_ea_from_body(exp, body);
607
608                         /*
609                          * The eadata is opaque; just check that it is there.
610                          * Eventually, obd_unpackmd() will check the contents.
611                          */
612                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
613                                                         body->mbo_eadatasize);
614                         if (eadata == NULL)
615                                 RETURN(-EPROTO);
616
617                         /* save lvb data and length in case this is for layout
618                          * lock */
619                         lvb_data = eadata;
620                         lvb_len = body->mbo_eadatasize;
621
622                         /*
623                          * We save the reply LOV EA in case we have to replay a
624                          * create for recovery.  If we didn't allocate a large
625                          * enough request buffer above we need to reallocate it
626                          * here to hold the actual LOV EA.
627                          *
628                          * To not save LOV EA if request is not going to replay
629                          * (for example error one).
630                          */
631                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
632                                 void *lmm;
633                                 if (req_capsule_get_size(pill, &RMF_EADATA,
634                                                          RCL_CLIENT) <
635                                     body->mbo_eadatasize)
636                                         mdc_realloc_openmsg(req, body);
637                                 else
638                                         req_capsule_shrink(pill, &RMF_EADATA,
639                                                            body->mbo_eadatasize,
640                                                            RCL_CLIENT);
641
642                                 req_capsule_set_size(pill, &RMF_EADATA,
643                                                      RCL_CLIENT,
644                                                      body->mbo_eadatasize);
645
646                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
647                                 if (lmm)
648                                         memcpy(lmm, eadata,
649                                                body->mbo_eadatasize);
650                         }
651                 }
652         } else if (it->it_op & IT_LAYOUT) {
653                 /* maybe the lock was granted right away and layout
654                  * is packed into RMF_DLM_LVB of req */
655                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
656                 if (lvb_len > 0) {
657                         lvb_data = req_capsule_server_sized_get(pill,
658                                                         &RMF_DLM_LVB, lvb_len);
659                         if (lvb_data == NULL)
660                                 RETURN(-EPROTO);
661                 }
662         }
663
664         /* fill in stripe data for layout lock.
665          * LU-6581: trust layout data only if layout lock is granted. The MDT
666          * has stopped sending layout unless the layout lock is granted. The
667          * client still does this checking in case it's talking with an old
668          * server. - Jinshan */
669         lock = ldlm_handle2lock(lockh);
670         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
671             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
672                 void *lmm;
673
674                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
675                         ldlm_it2str(it->it_op), lvb_len);
676
677                 OBD_ALLOC_LARGE(lmm, lvb_len);
678                 if (lmm == NULL) {
679                         LDLM_LOCK_PUT(lock);
680                         RETURN(-ENOMEM);
681                 }
682                 memcpy(lmm, lvb_data, lvb_len);
683
684                 /* install lvb_data */
685                 lock_res_and_lock(lock);
686                 if (lock->l_lvb_data == NULL) {
687                         lock->l_lvb_type = LVB_T_LAYOUT;
688                         lock->l_lvb_data = lmm;
689                         lock->l_lvb_len = lvb_len;
690                         lmm = NULL;
691                 }
692                 unlock_res_and_lock(lock);
693                 if (lmm != NULL)
694                         OBD_FREE_LARGE(lmm, lvb_len);
695         }
696         if (lock != NULL)
697                 LDLM_LOCK_PUT(lock);
698
699         RETURN(rc);
700 }
701
702 /* We always reserve enough space in the reply packet for a stripe MD, because
703  * we don't know in advance the file type. */
704 int mdc_enqueue(struct obd_export *exp,
705                 struct ldlm_enqueue_info *einfo,
706                 const union ldlm_policy_data *policy,
707                 struct lookup_intent *it, struct md_op_data *op_data,
708                 struct lustre_handle *lockh, __u64 extra_lock_flags)
709 {
710         struct obd_device *obddev = class_exp2obd(exp);
711         struct ptlrpc_request *req = NULL;
712         __u64 flags, saved_flags = extra_lock_flags;
713         struct ldlm_res_id res_id;
714         static const union ldlm_policy_data lookup_policy = {
715                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
716         static const union ldlm_policy_data update_policy = {
717                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
718         static const union ldlm_policy_data layout_policy = {
719                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
720         static const union ldlm_policy_data getxattr_policy = {
721                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
722         int generation, resends = 0;
723         struct ldlm_reply *lockrep;
724         enum lvb_type lvb_type = 0;
725         int rc;
726         ENTRY;
727
728         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
729                  einfo->ei_type);
730         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
731
732         if (it != NULL) {
733                 LASSERT(policy == NULL);
734
735                 saved_flags |= LDLM_FL_HAS_INTENT;
736                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
737                         policy = &update_policy;
738                 else if (it->it_op & IT_LAYOUT)
739                         policy = &layout_policy;
740                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
741                         policy = &getxattr_policy;
742                 else
743                         policy = &lookup_policy;
744         }
745
746         generation = obddev->u.cli.cl_import->imp_generation;
747 resend:
748         flags = saved_flags;
749         if (it == NULL) {
750                 /* The only way right now is FLOCK. */
751                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
752                          einfo->ei_type);
753                 res_id.name[3] = LDLM_FLOCK;
754         } else if (it->it_op & IT_OPEN) {
755                 req = mdc_intent_open_pack(exp, it, op_data);
756         } else if (it->it_op & IT_UNLINK) {
757                 req = mdc_intent_unlink_pack(exp, it, op_data);
758         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
759                 req = mdc_intent_getattr_pack(exp, it, op_data);
760         } else if (it->it_op & IT_READDIR) {
761                 req = mdc_enqueue_pack(exp, 0);
762         } else if (it->it_op & IT_LAYOUT) {
763                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
764                         RETURN(-EOPNOTSUPP);
765                 req = mdc_intent_layout_pack(exp, it, op_data);
766                 lvb_type = LVB_T_LAYOUT;
767         } else if (it->it_op & IT_GETXATTR) {
768                 req = mdc_intent_getxattr_pack(exp, it, op_data);
769         } else {
770                 LBUG();
771                 RETURN(-EINVAL);
772         }
773
774         if (IS_ERR(req))
775                 RETURN(PTR_ERR(req));
776
777         if (resends) {
778                 req->rq_generation_set = 1;
779                 req->rq_import_generation = generation;
780                 req->rq_sent = cfs_time_current_sec() + resends;
781         }
782
783         /* It is important to obtain modify RPC slot first (if applicable), so
784          * that threads that are waiting for a modify RPC slot are not polluting
785          * our rpcs in flight counter.
786          * We do not do flock request limiting, though */
787         if (it) {
788                 mdc_get_mod_rpc_slot(req, it);
789                 rc = obd_get_request_slot(&obddev->u.cli);
790                 if (rc != 0) {
791                         mdc_put_mod_rpc_slot(req, it);
792                         mdc_clear_replay_flag(req, 0);
793                         ptlrpc_req_finished(req);
794                         RETURN(rc);
795                 }
796         }
797
798         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
799                               0, lvb_type, lockh, 0);
800         if (!it) {
801                 /* For flock requests we immediatelly return without further
802                    delay and let caller deal with the rest, since rest of
803                    this function metadata processing makes no sense for flock
804                    requests anyway. But in case of problem during comms with
805                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
806                    can not rely on caller and this mainly for F_UNLCKs
807                    (explicits or automatically generated by Kernel to clean
808                    current FLocks upon exit) that can't be trashed */
809                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
810                     (einfo->ei_type == LDLM_FLOCK) &&
811                     (einfo->ei_mode == LCK_NL))
812                         goto resend;
813                 RETURN(rc);
814         }
815
816         obd_put_request_slot(&obddev->u.cli);
817         mdc_put_mod_rpc_slot(req, it);
818
819         if (rc < 0) {
820                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
821                        obddev->obd_name, rc);
822
823                 mdc_clear_replay_flag(req, rc);
824                 ptlrpc_req_finished(req);
825                 RETURN(rc);
826         }
827
828         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
829         LASSERT(lockrep != NULL);
830
831         lockrep->lock_policy_res2 =
832                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
833
834         /* Retry infinitely when the server returns -EINPROGRESS for the
835          * intent operation, when server returns -EINPROGRESS for acquiring
836          * intent lock, we'll retry in after_reply(). */
837         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
838                 mdc_clear_replay_flag(req, rc);
839                 ptlrpc_req_finished(req);
840                 resends++;
841
842                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
843                        obddev->obd_name, resends, it->it_op,
844                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
845
846                 if (generation == obddev->u.cli.cl_import->imp_generation) {
847                         goto resend;
848                 } else {
849                         CDEBUG(D_HA, "resend cross eviction\n");
850                         RETURN(-EIO);
851                 }
852         }
853
854         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
855         if (rc < 0) {
856                 if (lustre_handle_is_used(lockh)) {
857                         ldlm_lock_decref(lockh, einfo->ei_mode);
858                         memset(lockh, 0, sizeof(*lockh));
859                 }
860                 ptlrpc_req_finished(req);
861
862                 it->it_lock_handle = 0;
863                 it->it_lock_mode = 0;
864                 it->it_request = NULL;
865         }
866
867         RETURN(rc);
868 }
869
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871                                   struct ptlrpc_request *request,
872                                   struct md_op_data *op_data,
873                                   struct lookup_intent *it,
874                                   struct lustre_handle *lockh)
875 {
876         struct lustre_handle old_lock;
877         struct mdt_body *mdt_body;
878         struct ldlm_lock *lock;
879         int rc;
880         ENTRY;
881
882         LASSERT(request != NULL);
883         LASSERT(request != LP_POISON);
884         LASSERT(request->rq_repmsg != LP_POISON);
885
886         if (it->it_op & IT_READDIR)
887                 RETURN(0);
888
889         if (!it_disposition(it, DISP_IT_EXECD)) {
890                 /* The server failed before it even started executing the
891                  * intent, i.e. because it couldn't unpack the request. */
892                 LASSERT(it->it_status != 0);
893                 RETURN(it->it_status);
894         }
895         rc = it_open_error(DISP_IT_EXECD, it);
896         if (rc)
897                 RETURN(rc);
898
899         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
900         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
901
902         rc = it_open_error(DISP_LOOKUP_EXECD, it);
903         if (rc)
904                 RETURN(rc);
905
906         /* keep requests around for the multiple phases of the call
907          * this shows the DISP_XX must guarantee we make it into the call
908          */
909         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
910             it_disposition(it, DISP_OPEN_CREATE) &&
911             !it_open_error(DISP_OPEN_CREATE, it)) {
912                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
913                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
914         }
915         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
916             it_disposition(it, DISP_OPEN_OPEN) &&
917             !it_open_error(DISP_OPEN_OPEN, it)) {
918                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
919                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
920                 /* BUG 11546 - eviction in the middle of open rpc processing */
921                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
922         }
923
924         if (it->it_op & IT_CREAT) {
925                 /* XXX this belongs in ll_create_it */
926         } else if (it->it_op == IT_OPEN) {
927                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
928         } else {
929                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
930         }
931
932         /* If we already have a matching lock, then cancel the new
933          * one.  We have to set the data here instead of in
934          * mdc_enqueue, because we need to use the child's inode as
935          * the l_ast_data to match, and that's not available until
936          * intent_finish has performed the iget().) */
937         lock = ldlm_handle2lock(lockh);
938         if (lock) {
939                 union ldlm_policy_data policy = lock->l_policy_data;
940                 LDLM_DEBUG(lock, "matching against this");
941
942                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
943                                          &lock->l_resource->lr_name),
944                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
945                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
946                 LDLM_LOCK_PUT(lock);
947
948                 memcpy(&old_lock, lockh, sizeof(*lockh));
949                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
950                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
951                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
952                         memcpy(lockh, &old_lock, sizeof(old_lock));
953                         it->it_lock_handle = lockh->cookie;
954                 }
955         }
956
957         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
958                 (int)op_data->op_namelen, op_data->op_name,
959                 ldlm_it2str(it->it_op), it->it_status,
960                 it->it_disposition, rc);
961
962         RETURN(rc);
963 }
964
965 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
966                         struct lu_fid *fid, __u64 *bits)
967 {
968         /* We could just return 1 immediately, but since we should only
969          * be called in revalidate_it if we already have a lock, let's
970          * verify that. */
971         struct ldlm_res_id res_id;
972         struct lustre_handle lockh;
973         union ldlm_policy_data policy;
974         enum ldlm_mode mode;
975         ENTRY;
976
977         if (it->it_lock_handle) {
978                 lockh.cookie = it->it_lock_handle;
979                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
980         } else {
981                 fid_build_reg_res_name(fid, &res_id);
982                 switch (it->it_op) {
983                 case IT_GETATTR:
984                         /* File attributes are held under multiple bits:
985                          * nlink is under lookup lock, size and times are
986                          * under UPDATE lock and recently we've also got
987                          * a separate permissions lock for owner/group/acl that
988                          * were protected by lookup lock before.
989                          * Getattr must provide all of that information,
990                          * so we need to ensure we have all of those locks.
991                          * Unfortunately, if the bits are split across multiple
992                          * locks, there's no easy way to match all of them here,
993                          * so an extra RPC would be performed to fetch all
994                          * of those bits at once for now. */
995                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
996                          * but for old MDTs (< 2.4), permission is covered
997                          * by LOOKUP lock, so it needs to match all bits here.*/
998                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
999                                                   MDS_INODELOCK_LOOKUP |
1000                                                   MDS_INODELOCK_PERM;
1001                         break;
1002                 case IT_READDIR:
1003                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1004                         break;
1005                 case IT_LAYOUT:
1006                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1007                         break;
1008                 default:
1009                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1010                         break;
1011                 }
1012
1013                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1014                                       LDLM_IBITS, &policy,
1015                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1016                                       &lockh);
1017         }
1018
1019         if (mode) {
1020                 it->it_lock_handle = lockh.cookie;
1021                 it->it_lock_mode = mode;
1022         } else {
1023                 it->it_lock_handle = 0;
1024                 it->it_lock_mode = 0;
1025         }
1026
1027         RETURN(!!mode);
1028 }
1029
1030 /*
1031  * This long block is all about fixing up the lock and request state
1032  * so that it is correct as of the moment _before_ the operation was
1033  * applied; that way, the VFS will think that everything is normal and
1034  * call Lustre's regular VFS methods.
1035  *
1036  * If we're performing a creation, that means that unless the creation
1037  * failed with EEXIST, we should fake up a negative dentry.
1038  *
1039  * For everything else, we want to lookup to succeed.
1040  *
1041  * One additional note: if CREATE or OPEN succeeded, we add an extra
1042  * reference to the request because we need to keep it around until
1043  * ll_create/ll_open gets called.
1044  *
1045  * The server will return to us, in it_disposition, an indication of
1046  * exactly what it_status refers to.
1047  *
1048  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1049  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1050  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1051  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1052  * was successful.
1053  *
1054  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1055  * child lookup.
1056  */
1057 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1058                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1059                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1060 {
1061         struct ldlm_enqueue_info einfo = {
1062                 .ei_type        = LDLM_IBITS,
1063                 .ei_mode        = it_to_lock_mode(it),
1064                 .ei_cb_bl       = cb_blocking,
1065                 .ei_cb_cp       = ldlm_completion_ast,
1066         };
1067         struct lustre_handle lockh;
1068         int rc = 0;
1069         ENTRY;
1070         LASSERT(it);
1071
1072         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1073                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1074                 op_data->op_name, PFID(&op_data->op_fid2),
1075                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1076                 it->it_flags);
1077
1078         lockh.cookie = 0;
1079         if (fid_is_sane(&op_data->op_fid2) &&
1080             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1081                 /* We could just return 1 immediately, but since we should only
1082                  * be called in revalidate_it if we already have a lock, let's
1083                  * verify that. */
1084                 it->it_lock_handle = 0;
1085                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1086                 /* Only return failure if it was not GETATTR by cfid
1087                    (from inode_revalidate) */
1088                 if (rc || op_data->op_namelen != 0)
1089                         RETURN(rc);
1090         }
1091
1092         /* For case if upper layer did not alloc fid, do it now. */
1093         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1094                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1095                 if (rc < 0) {
1096                         CERROR("Can't alloc new fid, rc %d\n", rc);
1097                         RETURN(rc);
1098                 }
1099         }
1100
1101         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1102                          extra_lock_flags);
1103         if (rc < 0)
1104                 RETURN(rc);
1105
1106         *reqp = it->it_request;
1107         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1108         RETURN(rc);
1109 }
1110
1111 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1112                                               struct ptlrpc_request *req,
1113                                               void *args, int rc)
1114 {
1115         struct mdc_getattr_args  *ga = args;
1116         struct obd_export        *exp = ga->ga_exp;
1117         struct md_enqueue_info   *minfo = ga->ga_minfo;
1118         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1119         struct lookup_intent     *it;
1120         struct lustre_handle     *lockh;
1121         struct obd_device        *obddev;
1122         struct ldlm_reply        *lockrep;
1123         __u64                     flags = LDLM_FL_HAS_INTENT;
1124         ENTRY;
1125
1126         it    = &minfo->mi_it;
1127         lockh = &minfo->mi_lockh;
1128
1129         obddev = class_exp2obd(exp);
1130
1131         obd_put_request_slot(&obddev->u.cli);
1132         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1133                 rc = -ETIMEDOUT;
1134
1135         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1136                                    &flags, NULL, 0, lockh, rc);
1137         if (rc < 0) {
1138                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1139                 mdc_clear_replay_flag(req, rc);
1140                 GOTO(out, rc);
1141         }
1142
1143         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1144         LASSERT(lockrep != NULL);
1145
1146         lockrep->lock_policy_res2 =
1147                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1148
1149         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1150         if (rc)
1151                 GOTO(out, rc);
1152
1153         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1154         EXIT;
1155
1156 out:
1157         minfo->mi_cb(req, minfo, rc);
1158         return 0;
1159 }
1160
1161 int mdc_intent_getattr_async(struct obd_export *exp,
1162                              struct md_enqueue_info *minfo)
1163 {
1164         struct md_op_data       *op_data = &minfo->mi_data;
1165         struct lookup_intent    *it = &minfo->mi_it;
1166         struct ptlrpc_request   *req;
1167         struct mdc_getattr_args *ga;
1168         struct obd_device       *obddev = class_exp2obd(exp);
1169         struct ldlm_res_id       res_id;
1170         union ldlm_policy_data policy = {
1171                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1172                                                  MDS_INODELOCK_UPDATE } };
1173         int                      rc = 0;
1174         __u64                    flags = LDLM_FL_HAS_INTENT;
1175         ENTRY;
1176
1177         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1178                 LPF64"o\n",
1179                 (int)op_data->op_namelen, op_data->op_name,
1180                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1181
1182         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1183         req = mdc_intent_getattr_pack(exp, it, op_data);
1184         if (IS_ERR(req))
1185                 RETURN(PTR_ERR(req));
1186
1187         rc = obd_get_request_slot(&obddev->u.cli);
1188         if (rc != 0) {
1189                 ptlrpc_req_finished(req);
1190                 RETURN(rc);
1191         }
1192
1193         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1194                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1195         if (rc < 0) {
1196                 obd_put_request_slot(&obddev->u.cli);
1197                 ptlrpc_req_finished(req);
1198                 RETURN(rc);
1199         }
1200
1201         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1202         ga = ptlrpc_req_async_args(req);
1203         ga->ga_exp = exp;
1204         ga->ga_minfo = minfo;
1205
1206         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1207         ptlrpcd_add_req(req);
1208
1209         RETURN(0);
1210 }