Whamcloud - gitweb
89d7fb1c6bacbe67d71c3b4670487a15b9ca0131
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_OPEN)) {
59                 if (phase >= DISP_OPEN_OPEN)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64
65         if (it_disposition(it, DISP_OPEN_CREATE)) {
66                 if (phase >= DISP_OPEN_CREATE)
67                         return it->d.lustre.it_status;
68                 else
69                         return 0;
70         }
71
72         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
73                 if (phase >= DISP_LOOKUP_EXECD)
74                         return it->d.lustre.it_status;
75                 else
76                         return 0;
77         }
78
79         if (it_disposition(it, DISP_IT_EXECD)) {
80                 if (phase >= DISP_IT_EXECD)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
86                it->d.lustre.it_status);
87         LBUG();
88         return 0;
89 }
90 EXPORT_SYMBOL(it_open_error);
91
92 /* this must be called on a lockh that is known to have a referenced lock */
93 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
94 {
95         struct ldlm_lock *lock;
96         struct lustre_handle *lockh = (struct lustre_handle *)l;
97         ENTRY;
98
99         if(bits)
100                 *bits = 0;
101
102         if (!*l) {
103                 EXIT;
104                 return;
105         }
106
107         lock = ldlm_handle2lock(lockh);
108
109         LASSERT(lock != NULL);
110         lock_res_and_lock(lock);
111 #ifdef __KERNEL__
112         if (lock->l_ast_data && lock->l_ast_data != data) {
113                 struct inode *new_inode = data;
114                 struct inode *old_inode = lock->l_ast_data;
115                 LASSERTF(old_inode->i_state & I_FREEING,
116                          "Found existing inode %p/%lu/%u state %lu in lock: "
117                          "setting data to %p/%lu/%u\n", old_inode,
118                          old_inode->i_ino, old_inode->i_generation,
119                          old_inode->i_state,
120                          new_inode, new_inode->i_ino, new_inode->i_generation);
121         }
122 #endif
123         lock->l_ast_data = data;
124         if (bits)
125                 *bits = lock->l_policy_data.l_inodebits.bits;
126         unlock_res_and_lock(lock);
127         LDLM_LOCK_PUT(lock);
128
129         EXIT;
130 }
131 EXPORT_SYMBOL(mdc_set_lock_data);
132
133 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
134                       ldlm_iterator_t it, void *data)
135 {
136         struct ldlm_res_id res_id;
137         ENTRY;
138
139         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
140         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
141                               it, data);
142
143         EXIT;
144         return 0;
145 }
146
147 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
148 {
149         /* Don't hold error requests for replay. */
150         if (req->rq_replay) {
151                 spin_lock(&req->rq_lock);
152                 req->rq_replay = 0;
153                 spin_unlock(&req->rq_lock);
154         }
155         if (rc && req->rq_transno != 0) {
156                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
157                 LBUG();
158         }
159 }
160
161 static int round_up(int val)
162 {
163         int ret = 1;
164         while (val) {
165                 val >>= 1;
166                 ret <<= 1;
167         }
168         return ret;
169 }
170
171 /* Save a large LOV EA into the request buffer so that it is available
172  * for replay.  We don't do this in the initial request because the
173  * original request doesn't need this buffer (at most it sends just the
174  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
175  * buffer and may also be difficult to allocate and save a very large
176  * request buffer for each open. (bug 5707)
177  *
178  * OOM here may cause recovery failure if lmm is needed (only for the
179  * original open if the MDS crashed just when this client also OOM'd)
180  * but this is incredibly unlikely, and questionable whether the client
181  * could do MDS recovery under OOM anyways... */
182 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
183                                 struct mds_body *body)
184 {
185         int old_len, new_size, old_size;
186         struct lustre_msg *old_msg = req->rq_reqmsg;
187         struct lustre_msg *new_msg;
188         int offset;
189
190         if (mdc_req_is_2_0_server(req))
191                 offset = 4;
192         else
193                 offset = 2;
194
195         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
196         old_size = lustre_packed_msg_size(old_msg);
197         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
198                               body->eadatasize);
199         /* old buffer is more then need */
200         if (old_len > body->eadatasize)
201                 return;
202
203         new_size = lustre_packed_msg_size(old_msg);
204
205         OBD_ALLOC(new_msg, new_size);
206         if (new_msg != NULL) {
207                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
208                           body->eadatasize);
209                 memcpy(new_msg, old_msg, old_size);
210
211                 spin_lock(&req->rq_lock);
212                 req->rq_reqmsg = new_msg;
213                 req->rq_reqlen = new_size;
214                 spin_unlock(&req->rq_lock);
215
216                 OBD_FREE(old_msg, old_size);
217         } else {
218                 lustre_msg_set_buflen(old_msg,
219                                       DLM_INTENT_REC_OFF + offset, old_len);
220                 body->valid &= ~OBD_MD_FLEASIZE;
221                 body->eadatasize = 0;
222         }
223 }
224
225 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
226                                                    struct lookup_intent *it,
227                                                    struct mdc_op_data *data,
228                                                    void *lmm, __u32 lmmsize)
229 {
230         struct ptlrpc_request *req;
231         struct ldlm_intent *lit;
232         struct obd_device *obddev = class_exp2obd(exp);
233         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
234                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
235                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
236                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
237                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
238                         /* As an optimization, we allocate an RPC request buffer
239                          * for at least a default-sized LOV EA even if we aren't
240                          * sending one.  We grow the whole request to the next
241                          * power-of-two size since we get that much from a slab
242                          * allocation anyways. This avoids an allocation below
243                          * in the common case where we need to save a
244                          * default-sized LOV EA for open replay. */
245                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
246                                          obddev->u.cli.cl_default_mds_easize) };
247         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
248                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
249                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
250                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
251                                                         cl_max_mds_easize,
252                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
253         CFS_LIST_HEAD(cancels);
254         int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
255         int count = 0;
256         int bufcount = 6;
257         int repbufcount = 5;
258         int mode;
259         int rc;
260         ENTRY;
261
262         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
263         if (mdc_exp_is_2_0_server(exp)) {
264                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
265                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
266                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
267                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
268                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
269                 bufcount = 8;
270                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
271                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
272                 repbufcount = 7;
273         }
274         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
275                              bufcount, size);
276         if (rc & (rc - 1))
277                 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
278                                          (__u32)obddev->u.cli.cl_max_mds_easize);
279
280         /* If inode is known, cancel conflicting OPEN locks. */
281         if (data->fid2.id) {
282                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
283                         mode = LCK_CW;
284 #ifdef FMODE_EXEC
285                 else if (it->it_flags & FMODE_EXEC)
286                         mode = LCK_PR;
287 #endif
288                 else
289                         mode = LCK_CR;
290                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
291                                                 mode, MDS_INODELOCK_OPEN);
292         }
293
294         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
295         if (it->it_op & IT_CREAT || do_join)
296                 mode = LCK_EX;
297         else
298                 mode = LCK_CR;
299         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
300                                          MDS_INODELOCK_UPDATE);
301         if (do_join) {
302                 __u64 head_size = (*(__u64 *)data->data);
303                 /* join is like an unlink of the tail */
304                 if (mdc_exp_is_2_0_server(exp)) {
305                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
306                 } else {
307                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
308                 }
309                 bufcount++;
310
311                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
312                 if (req)
313                         mdc_join_pack(req, bufcount - 1, data, head_size);
314         } else {
315                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
316                 it->it_create_mode &= ~M_JOIN_FILE;
317         }
318
319         if (req) {
320                 spin_lock(&req->rq_lock);
321                 req->rq_replay = 1;
322                 spin_unlock(&req->rq_lock);
323
324                 /* pack the intent */
325                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
326                                      sizeof(*lit));
327                 lit->opc = (__u64)it->it_op;
328
329                 /* pack the intended request */
330                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
331                               it->it_create_mode, 0, it->it_flags,
332                               lmm, lmmsize);
333
334                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
335         }
336         RETURN(req);
337 }
338
339 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
340                                                      struct lookup_intent *it,
341                                                      struct mdc_op_data *data)
342 {
343         struct ptlrpc_request *req;
344         struct ldlm_intent *lit;
345         struct obd_device *obddev = class_exp2obd(exp);
346         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
347                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
348                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
349                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
350                                                 sizeof(struct mdt_rec_unlink) :
351                                                 sizeof(struct mds_rec_unlink),
352                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
353         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
354                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
355                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
356                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
357                                                         cl_max_mds_easize,
358                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
359                                                         cl_max_mds_cookiesize };
360         ENTRY;
361
362         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
363         if (req) {
364                 /* pack the intent */
365                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
366                                      sizeof(*lit));
367                 lit->opc = (__u64)it->it_op;
368
369                 /* pack the intended request */
370                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
371
372                 ptlrpc_req_set_repsize(req, 5, repsize);
373         }
374         RETURN(req);
375 }
376
377 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
378                                                      struct lookup_intent *it,
379                                                      struct mdc_op_data *data)
380 {
381         struct ptlrpc_request *req;
382         struct ldlm_intent *lit;
383         struct obd_device *obddev = class_exp2obd(exp);
384         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
385                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
386                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
387                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
388                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
389                         [DLM_INTENT_REC_OFF+2]= 0 };
390         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
391                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
392                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
393                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
394                                                         cl_max_mds_easize,
395                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
396                            [DLM_REPLY_REC_OFF+3] = 0 };
397         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
398                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
399         int bufcount = 5;
400         ENTRY;
401
402         if (mdc_exp_is_2_0_server(exp)) {
403                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
404                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
405                 bufcount = 6;
406         }
407         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
408         if (req) {
409                 /* pack the intent */
410                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
411                                      sizeof(*lit));
412                 lit->opc = (__u64)it->it_op;
413
414                 /* pack the intended request */
415                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
416                                  data);
417                 ptlrpc_req_set_repsize(req, bufcount, repsize);
418         }
419         RETURN(req);
420 }
421
422 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
423 {
424         struct ptlrpc_request *req;
425         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
426                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
427         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
428                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
429                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
430         ENTRY;
431
432         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
433         if (req)
434                 ptlrpc_req_set_repsize(req, 3, repsize);
435         RETURN(req);
436 }
437
438 static int mdc_finish_enqueue(struct obd_export *exp,
439                               struct ptlrpc_request *req,
440                               struct ldlm_enqueue_info *einfo,
441                               struct lookup_intent *it,
442                               struct lustre_handle *lockh,
443                               int rc)
444 {
445         struct ldlm_request *lockreq;
446         struct ldlm_reply *lockrep;
447         ENTRY;
448
449         LASSERT(rc >= 0);
450         /* Similarly, if we're going to replay this request, we don't want to
451          * actually get a lock, just perform the intent. */
452         if (req->rq_transno || req->rq_replay) {
453                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
454                                          sizeof(*lockreq));
455                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
456         }
457
458         if (rc == ELDLM_LOCK_ABORTED) {
459                 einfo->ei_mode = 0;
460                 memset(lockh, 0, sizeof(*lockh));
461                 rc = 0;
462         } else { /* rc = 0 */
463                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
464                 LASSERT(lock);
465
466                 /* If the server gave us back a different lock mode, we should
467                  * fix up our variables. */
468                 if (lock->l_req_mode != einfo->ei_mode) {
469                         ldlm_lock_addref(lockh, lock->l_req_mode);
470                         ldlm_lock_decref(lockh, einfo->ei_mode);
471                         einfo->ei_mode = lock->l_req_mode;
472                 }
473                 LDLM_LOCK_PUT(lock);
474         }
475
476         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
477                                  sizeof(*lockrep));
478         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
479         /* swabbed by ldlm_cli_enqueue() */
480         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
481
482         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
483         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
484         it->d.lustre.it_lock_mode = einfo->ei_mode;
485         it->d.lustre.it_lock_handle = lockh->cookie;
486         it->d.lustre.it_data = req;
487
488         if (it->d.lustre.it_status < 0 && req->rq_replay)
489                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
490
491         /* If we're doing an IT_OPEN which did not result in an actual
492          * successful open, then we need to remove the bit which saves
493          * this request for unconditional replay.
494          *
495          * It's important that we do this first!  Otherwise we might exit the
496          * function without doing so, and try to replay a failed create
497          * (bug 3440) */
498         if ((it->it_op & IT_OPEN) &&
499             req->rq_replay &&
500             (!it_disposition(it, DISP_OPEN_OPEN) ||
501              it->d.lustre.it_status != 0))
502                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
503
504         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
505                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
506
507         /* We know what to expect, so we do any byte flipping required here */
508         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
509                 struct mds_body *body;
510
511                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
512                                          lustre_swab_mds_body);
513                 if (body == NULL) {
514                         CERROR ("Can't swab mds_body\n");
515                         RETURN (-EPROTO);
516                 }
517
518                 /* If this is a successful OPEN request, we need to set
519                    replay handler and data early, so that if replay happens
520                    immediately after swabbing below, new reply is swabbed
521                    by that handler correctly */
522                 if (it_disposition(it, DISP_OPEN_OPEN) &&
523                     !it_open_error(DISP_OPEN_OPEN, it))
524                         mdc_set_open_replay_data(NULL, req);
525
526                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
527                         void *eadata;
528
529                         /* The eadata is opaque; just check that it is there.
530                          * Eventually, obd_unpackmd() will check the contents */
531                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
532                                                     body->eadatasize, NULL);
533                         if (eadata == NULL) {
534                                 CERROR ("Missing/short eadata\n");
535                                 RETURN (-EPROTO);
536                         }
537                         /* We save the reply LOV EA in case we have to replay
538                          * a create for recovery.  If we didn't allocate a
539                          * large enough request buffer above we need to
540                          * reallocate it here to hold the actual LOV EA. */
541                         if (it->it_op & IT_OPEN) {
542                                 int offset = DLM_INTENT_REC_OFF;
543                                 void *lmm;
544
545                                 if (mdc_req_is_2_0_server(req))
546                                         offset += 4;
547                                 else
548                                         offset += 2;
549
550                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
551                                     body->eadatasize)
552                                         mdc_realloc_openmsg(req, body);
553
554                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
555                                                      body->eadatasize);
556                                 if (lmm)
557                                         memcpy(lmm, eadata, body->eadatasize);
558                         }
559                 }
560         }
561
562         RETURN(rc);
563 }
564
565 /* We always reserve enough space in the reply packet for a stripe MD, because
566  * we don't know in advance the file type. */
567 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
568                 struct lookup_intent *it, struct mdc_op_data *data,
569                 struct lustre_handle *lockh, void *lmm, int lmmsize,
570                 int extra_lock_flags)
571 {
572         struct ptlrpc_request *req;
573         struct obd_device *obddev = class_exp2obd(exp);
574         struct ldlm_res_id res_id;
575         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
576         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
577         int rc;
578         ENTRY;
579
580         fid_build_reg_res_name((void *)&data->fid1, &res_id);
581         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
582         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
583                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
584
585         if (it->it_op & IT_OPEN) {
586                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
587                         struct client_obd *cli = &obddev->u.cli;
588                         data->fid3 = data->fid2;
589                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
590                         if (rc) {
591                                 CERROR("fid allocation result: %d\n", rc);
592                                 RETURN(rc);
593                         }
594                 }
595                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
596                 if (it->it_create_mode & M_JOIN_FILE) {
597                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
598                 }
599         } else if (it->it_op & IT_UNLINK) {
600                 req = mdc_intent_unlink_pack(exp, it, data);
601         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
602                 req = mdc_intent_lookup_pack(exp, it, data);
603         } else if (it->it_op == IT_READDIR) {
604                 req = mdc_intent_readdir_pack(exp);
605         } else {
606                 CERROR("bad it_op %x\n", it->it_op);
607                 RETURN(-EINVAL);
608         }
609
610         if (!req)
611                 RETURN(-ENOMEM);
612
613          /* It is important to obtain rpc_lock first (if applicable), so that
614           * threads that are serialised with rpc_lock are not polluting our
615           * rpcs in flight counter */
616         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
617         mdc_enter_request(&obddev->u.cli);
618         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
619                               0, NULL, lockh, 0);
620         mdc_exit_request(&obddev->u.cli);
621         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
622         if (rc < 0) {
623                 CERROR("ldlm_cli_enqueue: %d\n", rc);
624                 mdc_clear_replay_flag(req, rc);
625                 ptlrpc_req_finished(req);
626                 RETURN(rc);
627         }
628         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
629
630         RETURN(rc);
631 }
632 EXPORT_SYMBOL(mdc_enqueue);
633
634 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
635                         struct ll_fid *fid)
636 {
637                 /* We could just return 1 immediately, but since we should only
638                  * be called in revalidate_it if we already have a lock, let's
639                  * verify that. */
640         struct ldlm_res_id res_id;
641         struct lustre_handle lockh;
642         ldlm_policy_data_t policy;
643         ldlm_mode_t mode;
644         ENTRY;
645
646         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
647         /* As not all attributes are kept under update lock, e.g. 
648            owner/group/acls are under lookup lock, we need both 
649            ibits for GETATTR. */
650         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
651                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
652                 MDS_INODELOCK_LOOKUP;
653
654         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
655                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
656                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
657         if (mode) {
658                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
659                 it->d.lustre.it_lock_mode = mode;
660         }
661
662         RETURN(!!mode);
663 }
664 EXPORT_SYMBOL(mdc_revalidate_lock);
665
666 static int mdc_finish_intent_lock(struct obd_export *exp,
667                                   struct ptlrpc_request *req,
668                                   struct mdc_op_data *data,
669                                   struct lookup_intent *it,
670                                   struct lustre_handle *lockh)
671 {
672         struct mds_body *mds_body;
673         struct lustre_handle old_lock;
674         struct ldlm_lock *lock;
675         int rc;
676         ENTRY;
677
678         LASSERT(req != NULL);
679         LASSERT(req != LP_POISON);
680         LASSERT(req->rq_repmsg != LP_POISON);
681
682         if (!it_disposition(it, DISP_IT_EXECD)) {
683                 /* The server failed before it even started executing the
684                  * intent, i.e. because it couldn't unpack the request. */
685                 LASSERT(it->d.lustre.it_status != 0);
686                 RETURN(it->d.lustre.it_status);
687         }
688         rc = it_open_error(DISP_IT_EXECD, it);
689         if (rc)
690                 RETURN(rc);
691
692         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
693                                   sizeof(*mds_body));
694         /* mdc_enqueue checked */
695         LASSERT(mds_body != NULL);
696         /* mdc_enqueue swabbed */
697         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
698
699         /* If we were revalidating a fid/name pair, mark the intent in
700          * case we fail and get called again from lookup */
701
702         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
703            ( !mdc_exp_is_2_0_server(exp) ||
704              (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
705                 it_set_disposition(it, DISP_ENQ_COMPLETE);
706
707                 /* Also: did we find the same inode? */
708                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
709                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
710                         RETURN(-ESTALE);
711         }
712
713         rc = it_open_error(DISP_LOOKUP_EXECD, it);
714         if (rc)
715                 RETURN(rc);
716
717         /* keep requests around for the multiple phases of the call
718          * this shows the DISP_XX must guarantee we make it into the call
719          */
720         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
721             it_disposition(it, DISP_OPEN_CREATE) &&
722             !it_open_error(DISP_OPEN_CREATE, it)) {
723                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
724                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
725         }
726         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
727             it_disposition(it, DISP_OPEN_OPEN) &&
728             !it_open_error(DISP_OPEN_OPEN, it)) {
729                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
730                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
731                 /* BUG 11546 - eviction in the middle of open rpc processing */
732                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
733         }
734
735         if (it->it_op & IT_CREAT) {
736                 /* XXX this belongs in ll_create_it */
737         } else if (it->it_op == IT_OPEN) {
738                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
739         } else {
740                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
741         }
742
743         /* If we already have a matching lock, then cancel the new
744          * one.  We have to set the data here instead of in
745          * mdc_enqueue, because we need to use the child's inode as
746          * the l_ast_data to match, and that's not available until
747          * intent_finish has performed the iget().) */
748         lock = ldlm_handle2lock(lockh);
749         if (lock) {
750                 ldlm_policy_data_t policy = lock->l_policy_data;
751
752                 LDLM_DEBUG(lock, "matching against this");
753                 LDLM_LOCK_PUT(lock);
754                 memcpy(&old_lock, lockh, sizeof(*lockh));
755                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
756                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
757                         ldlm_lock_decref_and_cancel(lockh,
758                                                     it->d.lustre.it_lock_mode);
759                         memcpy(lockh, &old_lock, sizeof(old_lock));
760                         memcpy(&it->d.lustre.it_lock_handle, lockh,
761                                sizeof(*lockh));
762                 }
763         }
764
765         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
766                data->namelen, data->name, ldlm_it2str(it->it_op),
767                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
768         RETURN(rc);
769 }
770
771 /* 
772  * This long block is all about fixing up the lock and request state
773  * so that it is correct as of the moment _before_ the operation was
774  * applied; that way, the VFS will think that everything is normal and
775  * call Lustre's regular VFS methods.
776  *
777  * If we're performing a creation, that means that unless the creation
778  * failed with EEXIST, we should fake up a negative dentry.
779  *
780  * For everything else, we want to lookup to succeed.
781  *
782  * One additional note: if CREATE or OPEN succeeded, we add an extra
783  * reference to the request because we need to keep it around until
784  * ll_create/ll_open gets called.
785  *
786  * The server will return to us, in it_disposition, an indication of
787  * exactly what d.lustre.it_status refers to.
788  *
789  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
790  * otherwise if DISP_OPEN_CREATE is set, then it status is the
791  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
792  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
793  * was successful.
794  *
795  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
796  * child lookup.
797  */
798 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
799                     void *lmm, int lmmsize, struct lookup_intent *it,
800                     int lookup_flags, struct ptlrpc_request **reqp,
801                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
802 {
803         struct lustre_handle lockh;
804         int rc;
805         ENTRY;
806
807         LASSERT(it);
808
809         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
810                "intent: %s flags %#o\n",
811                op_data->namelen, op_data->name,
812                PFID(((void *)&op_data->fid2)),
813                PFID(((void *)&op_data->fid1)),
814                ldlm_it2str(it->it_op), it->it_flags);
815
816         lockh.cookie = 0;
817         if (op_data->fid2.id &&
818             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
819                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
820                 /* Only return failure if it was not GETATTR by cfid
821                    (from inode_revalidate) */
822                 if (rc || op_data->namelen != 0)
823                         RETURN(rc);
824         }
825
826         /* lookup_it may be called only after revalidate_it has run, because
827          * revalidate_it cannot return errors, only zero.  Returning zero causes
828          * this call to lookup, which *can* return an error.
829          *
830          * We only want to execute the request associated with the intent one
831          * time, however, so don't send the request again.  Instead, skip past
832          * this and use the request from revalidate.  In this case, revalidate
833          * never dropped its reference, so the refcounts are all OK */
834         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
835                 struct ldlm_enqueue_info einfo =
836                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
837                           ldlm_completion_ast, NULL, NULL };
838
839                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
840                                  lmm, lmmsize, extra_lock_flags);
841                 if (rc < 0)
842                         RETURN(rc);
843         } else if (!op_data->fid2.id) {
844                 /* DISP_ENQ_COMPLETE set means there is extra reference on
845                  * request referenced from this intent, saved for subsequent
846                  * lookup.  This path is executed when we proceed to this
847                  * lookup, so we clear DISP_ENQ_COMPLETE */
848                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
849         }
850
851         *reqp = it->d.lustre.it_data;
852         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
853
854         RETURN(rc);
855 }
856 EXPORT_SYMBOL(mdc_intent_lock);
857
858 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
859                                               void *unused, int rc)
860 {
861         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
862         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
863         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
864         struct lookup_intent     *it;
865         struct lustre_handle     *lockh;
866         struct obd_device        *obddev;
867         int                       flags = LDLM_FL_HAS_INTENT;
868         ENTRY;
869
870         it    = &minfo->mi_it;
871         lockh = &minfo->mi_lockh;
872
873         obddev = class_exp2obd(exp);
874
875         mdc_exit_request(&obddev->u.cli);
876         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
877                 rc = -ETIMEDOUT;
878
879         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
880                                    &flags, NULL, 0, NULL, lockh, rc);
881         if (rc < 0) {
882                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
883                 mdc_clear_replay_flag(req, rc);
884                 GOTO(out, rc);
885         }
886
887         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
888         if (rc)
889                 GOTO(out, rc);
890
891         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
892         GOTO(out, rc);
893 out:
894         OBD_FREE_PTR(einfo);
895         minfo->mi_cb(exp, req, minfo, rc);
896
897         return 0;
898 }
899
900 int mdc_intent_getattr_async(struct obd_export *exp,
901                              struct md_enqueue_info *minfo,
902                              struct ldlm_enqueue_info *einfo)
903 {
904         struct mdc_op_data      *op_data = &minfo->mi_data;
905         struct lookup_intent    *it = &minfo->mi_it;
906         struct ptlrpc_request   *req;
907         struct obd_device       *obddev = class_exp2obd(exp);
908         struct ldlm_res_id res_id;
909         ldlm_policy_data_t       policy = {
910                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
911                                  };
912         int                      rc;
913         int                      flags = LDLM_FL_HAS_INTENT;
914         ENTRY;
915
916         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
917                op_data->namelen, op_data->name, op_data->fid1.id,
918                ldlm_it2str(it->it_op), it->it_flags);
919
920         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
921         req = mdc_intent_lookup_pack(exp, it, op_data);
922         if (!req)
923                 RETURN(-ENOMEM);
924
925         mdc_enter_request(&obddev->u.cli);
926         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
927                               0, NULL, &minfo->mi_lockh, 1);
928         if (rc < 0) {
929                 mdc_exit_request(&obddev->u.cli);
930                 RETURN(rc);
931         }
932
933         req->rq_async_args.pointer_arg[0] = exp;
934         req->rq_async_args.pointer_arg[1] = minfo;
935         req->rq_async_args.pointer_arg[2] = einfo;
936         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
937         ptlrpcd_add_req(req);
938
939         RETURN(0);
940 }
941 EXPORT_SYMBOL(mdc_intent_getattr_async);