Whamcloud - gitweb
b=17682 limit performance impact of rpctrace, dlmtrace & quota
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_OPEN)) {
59                 if (phase >= DISP_OPEN_OPEN)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64
65         if (it_disposition(it, DISP_OPEN_CREATE)) {
66                 if (phase >= DISP_OPEN_CREATE)
67                         return it->d.lustre.it_status;
68                 else
69                         return 0;
70         }
71
72         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
73                 if (phase >= DISP_LOOKUP_EXECD)
74                         return it->d.lustre.it_status;
75                 else
76                         return 0;
77         }
78
79         if (it_disposition(it, DISP_IT_EXECD)) {
80                 if (phase >= DISP_IT_EXECD)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
86                it->d.lustre.it_status);
87         LBUG();
88         return 0;
89 }
90 EXPORT_SYMBOL(it_open_error);
91
92 /* this must be called on a lockh that is known to have a referenced lock */
93 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
94 {
95         struct ldlm_lock *lock;
96         struct lustre_handle *lockh = (struct lustre_handle *)l;
97         ENTRY;
98
99         if(bits)
100                 *bits = 0;
101
102         if (!*l) {
103                 EXIT;
104                 return;
105         }
106
107         lock = ldlm_handle2lock(lockh);
108
109         LASSERT(lock != NULL);
110         lock_res_and_lock(lock);
111 #ifdef __KERNEL__
112         if (lock->l_ast_data && lock->l_ast_data != data) {
113                 struct inode *new_inode = data;
114                 struct inode *old_inode = lock->l_ast_data;
115                 LASSERTF(old_inode->i_state & I_FREEING,
116                          "Found existing inode %p/%lu/%u state %lu in lock: "
117                          "setting data to %p/%lu/%u\n", old_inode,
118                          old_inode->i_ino, old_inode->i_generation,
119                          old_inode->i_state,
120                          new_inode, new_inode->i_ino, new_inode->i_generation);
121         }
122 #endif
123         lock->l_ast_data = data;
124         if (bits)
125                 *bits = lock->l_policy_data.l_inodebits.bits;
126         unlock_res_and_lock(lock);
127         LDLM_LOCK_PUT(lock);
128
129         EXIT;
130 }
131 EXPORT_SYMBOL(mdc_set_lock_data);
132
133 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
134                       ldlm_iterator_t it, void *data)
135 {
136         struct ldlm_res_id res_id;
137         ENTRY;
138
139         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
140         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
141                               it, data);
142
143         EXIT;
144         return 0;
145 }
146
147 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
148 {
149         /* Don't hold error requests for replay. */
150         if (req->rq_replay) {
151                 spin_lock(&req->rq_lock);
152                 req->rq_replay = 0;
153                 spin_unlock(&req->rq_lock);
154         }
155         if (rc && req->rq_transno != 0) {
156                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
157                 LBUG();
158         }
159 }
160
161 static int round_up(int val)
162 {
163         int ret = 1;
164         while (val) {
165                 val >>= 1;
166                 ret <<= 1;
167         }
168         return ret;
169 }
170
171 /* Save a large LOV EA into the request buffer so that it is available
172  * for replay.  We don't do this in the initial request because the
173  * original request doesn't need this buffer (at most it sends just the
174  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
175  * buffer and may also be difficult to allocate and save a very large
176  * request buffer for each open. (bug 5707)
177  *
178  * OOM here may cause recovery failure if lmm is needed (only for the
179  * original open if the MDS crashed just when this client also OOM'd)
180  * but this is incredibly unlikely, and questionable whether the client
181  * could do MDS recovery under OOM anyways... */
182 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
183                                 struct mds_body *body)
184 {
185         int old_len, new_size, old_size;
186         struct lustre_msg *old_msg = req->rq_reqmsg;
187         struct lustre_msg *new_msg;
188         int offset;
189
190         if (mdc_req_is_2_0_server(req))
191                 offset = 4;
192         else
193                 offset = 2;
194
195         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
196         old_size = lustre_packed_msg_size(old_msg);
197         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
198                               body->eadatasize);
199         /* old buffer is more then need */
200         if (old_len > body->eadatasize)
201                 return;
202
203         new_size = lustre_packed_msg_size(old_msg);
204
205         OBD_ALLOC(new_msg, new_size);
206         if (new_msg != NULL) {
207                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
208                           body->eadatasize);
209                 memcpy(new_msg, old_msg, old_size);
210
211                 spin_lock(&req->rq_lock);
212                 req->rq_reqmsg = new_msg;
213                 req->rq_reqlen = new_size;
214                 spin_unlock(&req->rq_lock);
215
216                 OBD_FREE(old_msg, old_size);
217         } else {
218                 lustre_msg_set_buflen(old_msg,
219                                       DLM_INTENT_REC_OFF + offset, old_len);
220                 body->valid &= ~OBD_MD_FLEASIZE;
221                 body->eadatasize = 0;
222         }
223 }
224
225 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
226                                                    struct lookup_intent *it,
227                                                    struct mdc_op_data *data,
228                                                    void *lmm, __u32 lmmsize)
229 {
230         struct ptlrpc_request *req;
231         struct ldlm_intent *lit;
232         struct obd_device *obddev = class_exp2obd(exp);
233         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
234                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
235                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
236                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
237                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
238                         /* As an optimization, we allocate an RPC request buffer
239                          * for at least a default-sized LOV EA even if we aren't
240                          * sending one.  We grow the whole request to the next
241                          * power-of-two size since we get that much from a slab
242                          * allocation anyways. This avoids an allocation below
243                          * in the common case where we need to save a
244                          * default-sized LOV EA for open replay. */
245                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
246                                          obddev->u.cli.cl_default_mds_easize) };
247         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
248                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
249                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
250                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
251                                                         cl_max_mds_easize,
252                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
253         CFS_LIST_HEAD(cancels);
254         int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
255         int count = 0;
256         int bufcount = 6;
257         int repbufcount = 5;
258         int mode;
259         int rc;
260         ENTRY;
261
262         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
263         if (mdc_exp_is_2_0_server(exp)) {
264                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
265                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
266                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
267                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
268                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
269                 bufcount = 8;
270                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
271                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
272                 repbufcount = 7;
273         }
274         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
275                              bufcount, size);
276         if (rc & (rc - 1))
277                 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
278                                          (__u32)obddev->u.cli.cl_max_mds_easize);
279
280         /* If inode is known, cancel conflicting OPEN locks. */
281         if (data->fid2.id) {
282                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
283                         mode = LCK_CW;
284 #ifdef FMODE_EXEC
285                 else if (it->it_flags & FMODE_EXEC)
286                         mode = LCK_PR;
287 #endif
288                 else
289                         mode = LCK_CR;
290                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
291                                                 mode, MDS_INODELOCK_OPEN);
292         }
293
294         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
295         if (it->it_op & IT_CREAT || do_join)
296                 mode = LCK_EX;
297         else
298                 mode = LCK_CR;
299         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
300                                          MDS_INODELOCK_UPDATE);
301         if (do_join) {
302                 __u64 head_size = (*(__u64 *)data->data);
303                 /* join is like an unlink of the tail */
304                 if (mdc_exp_is_2_0_server(exp)) {
305                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
306                 } else {
307                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
308                 }
309                 bufcount++;
310
311                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
312                 if (req)
313                         mdc_join_pack(req, bufcount - 1, data, head_size);
314         } else {
315                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
316                 it->it_create_mode &= ~M_JOIN_FILE;
317         }
318
319         if (req) {
320                 spin_lock(&req->rq_lock);
321                 req->rq_replay = 1;
322                 spin_unlock(&req->rq_lock);
323
324                 /* pack the intent */
325                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
326                                      sizeof(*lit));
327                 lit->opc = (__u64)it->it_op;
328
329                 /* pack the intended request */
330                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
331                               it->it_create_mode, 0, it->it_flags,
332                               lmm, lmmsize);
333
334                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
335         }
336         RETURN(req);
337 }
338
339 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
340                                                      struct lookup_intent *it,
341                                                      struct mdc_op_data *data)
342 {
343         struct ptlrpc_request *req;
344         struct ldlm_intent *lit;
345         struct obd_device *obddev = class_exp2obd(exp);
346         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
347                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
348                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
349                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
350                                                 sizeof(struct mdt_rec_unlink) :
351                                                 sizeof(struct mds_rec_unlink),
352                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
353         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
354                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
355                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
356                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
357                                                         cl_max_mds_easize,
358                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
359                                                         cl_max_mds_cookiesize };
360         ENTRY;
361
362         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
363         if (req) {
364                 /* pack the intent */
365                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
366                                      sizeof(*lit));
367                 lit->opc = (__u64)it->it_op;
368
369                 /* pack the intended request */
370                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
371
372                 ptlrpc_req_set_repsize(req, 5, repsize);
373         }
374         RETURN(req);
375 }
376
377 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
378                                                      struct lookup_intent *it,
379                                                      struct mdc_op_data *data)
380 {
381         struct ptlrpc_request *req;
382         struct ldlm_intent *lit;
383         struct obd_device *obddev = class_exp2obd(exp);
384         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
385                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
386                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
387                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
388                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
389                         [DLM_INTENT_REC_OFF+2]= 0 };
390         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
391                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
392                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
393                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
394                                                         cl_max_mds_easize,
395                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
396                            [DLM_REPLY_REC_OFF+3] = 0 };
397         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
398                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
399         int bufcount = 5;
400         ENTRY;
401
402         if (mdc_exp_is_2_0_server(exp)) {
403                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
404                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
405                 bufcount = 6;
406         }
407         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
408         if (req) {
409                 /* pack the intent */
410                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
411                                      sizeof(*lit));
412                 lit->opc = (__u64)it->it_op;
413
414                 /* pack the intended request */
415                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
416                                  data);
417                 ptlrpc_req_set_repsize(req, bufcount, repsize);
418         }
419         RETURN(req);
420 }
421
422 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
423 {
424         struct ptlrpc_request *req;
425         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
426                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
427         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
428                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
429                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
430         ENTRY;
431
432         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
433         if (req)
434                 ptlrpc_req_set_repsize(req, 3, repsize);
435         RETURN(req);
436 }
437
438 static int mdc_finish_enqueue(struct obd_export *exp,
439                               struct ptlrpc_request *req,
440                               struct ldlm_enqueue_info *einfo,
441                               struct lookup_intent *it,
442                               struct lustre_handle *lockh,
443                               int rc)
444 {
445         struct ldlm_request *lockreq;
446         struct ldlm_reply *lockrep;
447         ENTRY;
448
449         LASSERT(rc >= 0);
450         /* Similarly, if we're going to replay this request, we don't want to
451          * actually get a lock, just perform the intent. */
452         if (req->rq_transno || req->rq_replay) {
453                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
454                                          sizeof(*lockreq));
455                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
456         }
457
458         if (rc == ELDLM_LOCK_ABORTED) {
459                 einfo->ei_mode = 0;
460                 memset(lockh, 0, sizeof(*lockh));
461                 rc = 0;
462         } else { /* rc = 0 */
463                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
464                 LASSERT(lock);
465
466                 /* If the server gave us back a different lock mode, we should
467                  * fix up our variables. */
468                 if (lock->l_req_mode != einfo->ei_mode) {
469                         ldlm_lock_addref(lockh, lock->l_req_mode);
470                         ldlm_lock_decref(lockh, einfo->ei_mode);
471                         einfo->ei_mode = lock->l_req_mode;
472                 }
473                 LDLM_LOCK_PUT(lock);
474         }
475
476         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
477                                  sizeof(*lockrep));
478         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
479         /* swabbed by ldlm_cli_enqueue() */
480         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
481
482         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
483         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
484         it->d.lustre.it_lock_mode = einfo->ei_mode;
485         it->d.lustre.it_lock_handle = lockh->cookie;
486         it->d.lustre.it_data = req;
487
488         if (it->d.lustre.it_status < 0 && req->rq_replay)
489                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
490
491         /* If we're doing an IT_OPEN which did not result in an actual
492          * successful open, then we need to remove the bit which saves
493          * this request for unconditional replay.
494          *
495          * It's important that we do this first!  Otherwise we might exit the
496          * function without doing so, and try to replay a failed create
497          * (bug 3440) */
498         if ((it->it_op & IT_OPEN) &&
499             req->rq_replay &&
500             (!it_disposition(it, DISP_OPEN_OPEN) ||
501              it->d.lustre.it_status != 0))
502                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
503
504         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
505                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
506
507         /* We know what to expect, so we do any byte flipping required here */
508         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
509                 struct mds_body *body;
510
511                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
512                                          lustre_swab_mds_body);
513                 if (body == NULL) {
514                         CERROR ("Can't swab mds_body\n");
515                         RETURN (-EPROTO);
516                 }
517
518                 /* If this is a successful OPEN request, we need to set
519                    replay handler and data early, so that if replay happens
520                    immediately after swabbing below, new reply is swabbed
521                    by that handler correctly */
522                 if (it_disposition(it, DISP_OPEN_OPEN) &&
523                     !it_open_error(DISP_OPEN_OPEN, it))
524                         mdc_set_open_replay_data(NULL, req);
525
526                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
527                         void *eadata;
528
529                         mdc_update_max_ea_from_body(exp, body);
530
531                         /* The eadata is opaque; just check that it is there.
532                          * Eventually, obd_unpackmd() will check the contents */
533                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
534                                                     body->eadatasize, NULL);
535                         if (eadata == NULL) {
536                                 CERROR ("Missing/short eadata\n");
537                                 RETURN (-EPROTO);
538                         }
539                         /* We save the reply LOV EA in case we have to replay
540                          * a create for recovery.  If we didn't allocate a
541                          * large enough request buffer above we need to
542                          * reallocate it here to hold the actual LOV EA. */
543                         if (it->it_op & IT_OPEN) {
544                                 int offset = DLM_INTENT_REC_OFF;
545                                 void *lmm;
546
547                                 if (mdc_req_is_2_0_server(req))
548                                         offset += 4;
549                                 else
550                                         offset += 2;
551
552                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
553                                     body->eadatasize)
554                                         mdc_realloc_openmsg(req, body);
555
556                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
557                                                      body->eadatasize);
558                                 if (lmm)
559                                         memcpy(lmm, eadata, body->eadatasize);
560                         }
561                 }
562         }
563
564         RETURN(rc);
565 }
566
567 /* We always reserve enough space in the reply packet for a stripe MD, because
568  * we don't know in advance the file type. */
569 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
570                 struct lookup_intent *it, struct mdc_op_data *data,
571                 struct lustre_handle *lockh, void *lmm, int lmmsize,
572                 int extra_lock_flags)
573 {
574         struct ptlrpc_request *req;
575         struct obd_device *obddev = class_exp2obd(exp);
576         struct ldlm_res_id res_id;
577         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
578         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
579         int rc;
580         ENTRY;
581
582         fid_build_reg_res_name((void *)&data->fid1, &res_id);
583         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
584         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
585                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
586
587         if (it->it_op & IT_OPEN) {
588                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
589                         struct client_obd *cli = &obddev->u.cli;
590                         data->fid3 = data->fid2;
591                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
592                         if (rc) {
593                                 CERROR("fid allocation result: %d\n", rc);
594                                 RETURN(rc);
595                         }
596                 }
597                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
598                 if (it->it_create_mode & M_JOIN_FILE) {
599                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
600                 }
601         } else if (it->it_op & IT_UNLINK) {
602                 req = mdc_intent_unlink_pack(exp, it, data);
603         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
604                 req = mdc_intent_lookup_pack(exp, it, data);
605         } else if (it->it_op == IT_READDIR) {
606                 req = mdc_intent_readdir_pack(exp);
607         } else {
608                 CERROR("bad it_op %x\n", it->it_op);
609                 RETURN(-EINVAL);
610         }
611
612         if (!req)
613                 RETURN(-ENOMEM);
614
615          /* It is important to obtain rpc_lock first (if applicable), so that
616           * threads that are serialised with rpc_lock are not polluting our
617           * rpcs in flight counter */
618         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
619         mdc_enter_request(&obddev->u.cli);
620         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
621                               0, NULL, lockh, 0);
622         mdc_exit_request(&obddev->u.cli);
623         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
624         if (rc < 0) {
625                 CERROR("ldlm_cli_enqueue: %d\n", rc);
626                 mdc_clear_replay_flag(req, rc);
627                 ptlrpc_req_finished(req);
628                 RETURN(rc);
629         }
630         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
631
632         RETURN(rc);
633 }
634 EXPORT_SYMBOL(mdc_enqueue);
635
636 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
637                         struct ll_fid *fid)
638 {
639                 /* We could just return 1 immediately, but since we should only
640                  * be called in revalidate_it if we already have a lock, let's
641                  * verify that. */
642         struct ldlm_res_id res_id;
643         struct lustre_handle lockh;
644         ldlm_policy_data_t policy;
645         ldlm_mode_t mode;
646         ENTRY;
647
648         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
649         /* As not all attributes are kept under update lock, e.g. 
650            owner/group/acls are under lookup lock, we need both 
651            ibits for GETATTR. */
652         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
653                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
654                 MDS_INODELOCK_LOOKUP;
655
656         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
657                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
658                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
659         if (mode) {
660                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
661                 it->d.lustre.it_lock_mode = mode;
662         }
663
664         RETURN(!!mode);
665 }
666 EXPORT_SYMBOL(mdc_revalidate_lock);
667
668 static int mdc_finish_intent_lock(struct obd_export *exp,
669                                   struct ptlrpc_request *req,
670                                   struct mdc_op_data *data,
671                                   struct lookup_intent *it,
672                                   struct lustre_handle *lockh)
673 {
674         struct mds_body *mds_body;
675         struct lustre_handle old_lock;
676         struct ldlm_lock *lock;
677         int rc;
678         ENTRY;
679
680         LASSERT(req != NULL);
681         LASSERT(req != LP_POISON);
682         LASSERT(req->rq_repmsg != LP_POISON);
683
684         if (!it_disposition(it, DISP_IT_EXECD)) {
685                 /* The server failed before it even started executing the
686                  * intent, i.e. because it couldn't unpack the request. */
687                 LASSERT(it->d.lustre.it_status != 0);
688                 RETURN(it->d.lustre.it_status);
689         }
690         rc = it_open_error(DISP_IT_EXECD, it);
691         if (rc)
692                 RETURN(rc);
693
694         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
695                                   sizeof(*mds_body));
696         /* mdc_enqueue checked */
697         LASSERT(mds_body != NULL);
698         /* mdc_enqueue swabbed */
699         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
700
701         /* If we were revalidating a fid/name pair, mark the intent in
702          * case we fail and get called again from lookup */
703
704         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
705            ( !mdc_exp_is_2_0_server(exp) ||
706              (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
707                 it_set_disposition(it, DISP_ENQ_COMPLETE);
708
709                 /* Also: did we find the same inode? */
710                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
711                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
712                         RETURN(-ESTALE);
713         }
714
715         rc = it_open_error(DISP_LOOKUP_EXECD, it);
716         if (rc)
717                 RETURN(rc);
718
719         /* keep requests around for the multiple phases of the call
720          * this shows the DISP_XX must guarantee we make it into the call
721          */
722         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
723             it_disposition(it, DISP_OPEN_CREATE) &&
724             !it_open_error(DISP_OPEN_CREATE, it)) {
725                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
726                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
727         }
728         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
729             it_disposition(it, DISP_OPEN_OPEN) &&
730             !it_open_error(DISP_OPEN_OPEN, it)) {
731                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
732                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
733                 /* BUG 11546 - eviction in the middle of open rpc processing */
734                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
735         }
736
737         if (it->it_op & IT_CREAT) {
738                 /* XXX this belongs in ll_create_it */
739         } else if (it->it_op == IT_OPEN) {
740                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
741         } else {
742                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
743         }
744
745         /* If we already have a matching lock, then cancel the new
746          * one.  We have to set the data here instead of in
747          * mdc_enqueue, because we need to use the child's inode as
748          * the l_ast_data to match, and that's not available until
749          * intent_finish has performed the iget().) */
750         lock = ldlm_handle2lock(lockh);
751         if (lock) {
752                 ldlm_policy_data_t policy = lock->l_policy_data;
753
754                 LDLM_DEBUG(lock, "matching against this");
755                 LDLM_LOCK_PUT(lock);
756                 memcpy(&old_lock, lockh, sizeof(*lockh));
757                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
758                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
759                         ldlm_lock_decref_and_cancel(lockh,
760                                                     it->d.lustre.it_lock_mode);
761                         memcpy(lockh, &old_lock, sizeof(old_lock));
762                         memcpy(&it->d.lustre.it_lock_handle, lockh,
763                                sizeof(*lockh));
764                 }
765         }
766
767         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
768                data->namelen, data->name, ldlm_it2str(it->it_op),
769                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
770         RETURN(rc);
771 }
772
773 /* 
774  * This long block is all about fixing up the lock and request state
775  * so that it is correct as of the moment _before_ the operation was
776  * applied; that way, the VFS will think that everything is normal and
777  * call Lustre's regular VFS methods.
778  *
779  * If we're performing a creation, that means that unless the creation
780  * failed with EEXIST, we should fake up a negative dentry.
781  *
782  * For everything else, we want to lookup to succeed.
783  *
784  * One additional note: if CREATE or OPEN succeeded, we add an extra
785  * reference to the request because we need to keep it around until
786  * ll_create/ll_open gets called.
787  *
788  * The server will return to us, in it_disposition, an indication of
789  * exactly what d.lustre.it_status refers to.
790  *
791  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
792  * otherwise if DISP_OPEN_CREATE is set, then it status is the
793  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
794  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
795  * was successful.
796  *
797  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
798  * child lookup.
799  */
800 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
801                     void *lmm, int lmmsize, struct lookup_intent *it,
802                     int lookup_flags, struct ptlrpc_request **reqp,
803                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
804 {
805         struct lustre_handle lockh;
806         int rc;
807         ENTRY;
808
809         LASSERT(it);
810
811         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
812                "intent: %s flags %#o\n",
813                op_data->namelen, op_data->name,
814                PFID(((void *)&op_data->fid2)),
815                PFID(((void *)&op_data->fid1)),
816                ldlm_it2str(it->it_op), it->it_flags);
817
818         lockh.cookie = 0;
819         if (op_data->fid2.id &&
820             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
821                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
822                 /* Only return failure if it was not GETATTR by cfid
823                    (from inode_revalidate) */
824                 if (rc || op_data->namelen != 0)
825                         RETURN(rc);
826         }
827
828         /* lookup_it may be called only after revalidate_it has run, because
829          * revalidate_it cannot return errors, only zero.  Returning zero causes
830          * this call to lookup, which *can* return an error.
831          *
832          * We only want to execute the request associated with the intent one
833          * time, however, so don't send the request again.  Instead, skip past
834          * this and use the request from revalidate.  In this case, revalidate
835          * never dropped its reference, so the refcounts are all OK */
836         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
837                 struct ldlm_enqueue_info einfo =
838                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
839                           ldlm_completion_ast, NULL, NULL };
840
841                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
842                                  lmm, lmmsize, extra_lock_flags);
843                 if (rc < 0)
844                         RETURN(rc);
845         } else if (!op_data->fid2.id) {
846                 /* DISP_ENQ_COMPLETE set means there is extra reference on
847                  * request referenced from this intent, saved for subsequent
848                  * lookup.  This path is executed when we proceed to this
849                  * lookup, so we clear DISP_ENQ_COMPLETE */
850                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
851         }
852
853         *reqp = it->d.lustre.it_data;
854         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
855
856         RETURN(rc);
857 }
858 EXPORT_SYMBOL(mdc_intent_lock);
859
860 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
861                                               void *unused, int rc)
862 {
863         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
864         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
865         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
866         struct lookup_intent     *it;
867         struct lustre_handle     *lockh;
868         struct obd_device        *obddev;
869         int                       flags = LDLM_FL_HAS_INTENT;
870         ENTRY;
871
872         it    = &minfo->mi_it;
873         lockh = &minfo->mi_lockh;
874
875         obddev = class_exp2obd(exp);
876
877         mdc_exit_request(&obddev->u.cli);
878         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
879                 rc = -ETIMEDOUT;
880
881         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
882                                    &flags, NULL, 0, NULL, lockh, rc);
883         if (rc < 0) {
884                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
885                 mdc_clear_replay_flag(req, rc);
886                 GOTO(out, rc);
887         }
888
889         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
890         if (rc)
891                 GOTO(out, rc);
892
893         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
894         GOTO(out, rc);
895 out:
896         OBD_FREE_PTR(einfo);
897         minfo->mi_cb(exp, req, minfo, rc);
898
899         return 0;
900 }
901
902 int mdc_intent_getattr_async(struct obd_export *exp,
903                              struct md_enqueue_info *minfo,
904                              struct ldlm_enqueue_info *einfo)
905 {
906         struct mdc_op_data      *op_data = &minfo->mi_data;
907         struct lookup_intent    *it = &minfo->mi_it;
908         struct ptlrpc_request   *req;
909         struct obd_device       *obddev = class_exp2obd(exp);
910         struct ldlm_res_id res_id;
911         ldlm_policy_data_t       policy = {
912                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
913                                  };
914         int                      rc;
915         int                      flags = LDLM_FL_HAS_INTENT;
916         ENTRY;
917
918         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
919                op_data->namelen, op_data->name, op_data->fid1.id,
920                ldlm_it2str(it->it_op), it->it_flags);
921
922         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
923         req = mdc_intent_lookup_pack(exp, it, op_data);
924         if (!req)
925                 RETURN(-ENOMEM);
926
927         mdc_enter_request(&obddev->u.cli);
928         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
929                               0, NULL, &minfo->mi_lockh, 1);
930         if (rc < 0) {
931                 mdc_exit_request(&obddev->u.cli);
932                 RETURN(rc);
933         }
934
935         req->rq_async_args.pointer_arg[0] = exp;
936         req->rq_async_args.pointer_arg[1] = minfo;
937         req->rq_async_args.pointer_arg[2] = einfo;
938         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
939         ptlrpcd_add_req(req);
940
941         RETURN(0);
942 }
943 EXPORT_SYMBOL(mdc_intent_getattr_async);