Whamcloud - gitweb
a21992c5dfe50535f072842f8dfc16a4b1b7730e
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
115 {
116         struct ldlm_lock *lock;
117         ENTRY;
118
119         if (!*lockh) {
120                 EXIT;
121                 RETURN(0);
122         }
123
124         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
125
126         LASSERT(lock != NULL);
127         lock_res_and_lock(lock);
128 #ifdef __KERNEL__
129         if (lock->l_ast_data && lock->l_ast_data != data) {
130                 struct inode *new_inode = data;
131                 struct inode *old_inode = lock->l_ast_data;
132                 LASSERTF(old_inode->i_state & I_FREEING,
133                          "Found existing inode %p/%lu/%u state %lu in lock: "
134                          "setting data to %p/%lu/%u\n", old_inode,
135                          old_inode->i_ino, old_inode->i_generation,
136                          old_inode->i_state,
137                          new_inode, new_inode->i_ino, new_inode->i_generation);
138         }
139 #endif
140         lock->l_ast_data = data;
141         unlock_res_and_lock(lock);
142         LDLM_LOCK_PUT(lock);
143
144         RETURN(0);
145 }
146
147 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
148                            const struct lu_fid *fid, ldlm_type_t type,
149                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
150                            struct lustre_handle *lockh)
151 {
152         struct ldlm_res_id res_id;
153         ldlm_mode_t rc;
154         ENTRY;
155
156         fid_build_reg_res_name(fid, &res_id);
157         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
158                              &res_id, type, policy, mode, lockh);
159         RETURN(rc);
160 }
161
162 int mdc_cancel_unused(struct obd_export *exp,
163                       const struct lu_fid *fid,
164                       ldlm_policy_data_t *policy,
165                       ldlm_mode_t mode, int flags, void *opaque)
166 {
167         struct ldlm_res_id res_id;
168         struct obd_device *obd = class_exp2obd(exp);
169         int rc;
170
171         ENTRY;
172
173         fid_build_reg_res_name(fid, &res_id);
174         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175                                              policy, mode, flags, opaque);
176         RETURN(rc);
177 }
178
179 int mdc_change_cbdata(struct obd_export *exp,
180                       const struct lu_fid *fid,
181                       ldlm_iterator_t it, void *data)
182 {
183         struct ldlm_res_id res_id;
184         ENTRY;
185
186         fid_build_reg_res_name(fid, &res_id);
187         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
188                               &res_id, it, data);
189
190         EXIT;
191         return 0;
192 }
193
194 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 {
196         /* Don't hold error requests for replay. */
197         if (req->rq_replay) {
198                 spin_lock(&req->rq_lock);
199                 req->rq_replay = 0;
200                 spin_unlock(&req->rq_lock);
201         }
202         if (rc && req->rq_transno != 0) {
203                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
204                 LBUG();
205         }
206 }
207
208 /* Save a large LOV EA into the request buffer so that it is available
209  * for replay.  We don't do this in the initial request because the
210  * original request doesn't need this buffer (at most it sends just the
211  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212  * buffer and may also be difficult to allocate and save a very large
213  * request buffer for each open. (bug 5707)
214  *
215  * OOM here may cause recovery failure if lmm is needed (only for the
216  * original open if the MDS crashed just when this client also OOM'd)
217  * but this is incredibly unlikely, and questionable whether the client
218  * could do MDS recovery under OOM anyways... */
219 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
220                                 struct mdt_body *body)
221 {
222         int     rc;
223
224         /* FIXME: remove this explicit offset. */
225         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
226                                         body->eadatasize);
227         if (rc) {
228                 CERROR("Can't enlarge segment %d size to %d\n",
229                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
230                 body->valid &= ~OBD_MD_FLEASIZE;
231                 body->eadatasize = 0;
232         }
233 }
234
235 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
236                                                    struct lookup_intent *it,
237                                                    struct md_op_data *op_data,
238                                                    void *lmm, int lmmsize,
239                                                    void *cb_data)
240 {
241         struct ptlrpc_request *req;
242         struct obd_device     *obddev = class_exp2obd(exp);
243         struct ldlm_intent    *lit;
244         int                    joinfile = !!((it->it_flags & O_JOIN_FILE) && 
245                                               op_data->op_data);
246         CFS_LIST_HEAD(cancels);
247         int                    count = 0;
248         int                    mode;
249         int                    rc;
250         ENTRY;
251
252         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
253
254         /* XXX: openlock is not cancelled for cross-refs. */
255         /* If inode is known, cancel conflicting OPEN locks. */
256         if (fid_is_sane(&op_data->op_fid2)) {
257                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
258                         mode = LCK_CW;
259 #ifdef FMODE_EXEC
260                 else if (it->it_flags & FMODE_EXEC)
261                         mode = LCK_PR;
262 #endif
263                 else
264                         mode = LCK_CR;
265                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
266                                                 &cancels, mode,
267                                                 MDS_INODELOCK_OPEN);
268         }
269
270         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
271         if (it->it_op & IT_CREAT || joinfile)
272                 mode = LCK_EX;
273         else
274                 mode = LCK_CR;
275         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
276                                          &cancels, mode,
277                                          MDS_INODELOCK_UPDATE);
278
279         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280                                    &RQF_LDLM_INTENT_OPEN);
281         if (req == NULL) {
282                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283                 RETURN(ERR_PTR(-ENOMEM));
284         }
285
286         /* parent capability */
287         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
288         /* child capability, reserve the size according to parent capa, it will
289          * be filled after we get the reply */
290         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293                              op_data->op_namelen + 1);
294         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
296         if (!joinfile) {
297                 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
298                                      RCL_CLIENT, 0);
299         }
300
301         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
302         if (rc) {
303                 ptlrpc_request_free(req);
304                 return NULL;
305         }
306
307         if (joinfile) {
308                 __u64 head_size = *(__u64 *)op_data->op_data;
309                 mdc_join_pack(req, op_data, head_size);
310         }
311
312         spin_lock(&req->rq_lock);
313         req->rq_replay = 1;
314         spin_unlock(&req->rq_lock);
315
316         /* pack the intent */
317         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
318         lit->opc = (__u64)it->it_op;
319
320         /* pack the intended request */
321         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
322                       lmmsize);
323
324         /* for remote client, fetch remote perm for current user */
325         if (client_is_remote(exp))
326                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
327                                      sizeof(struct mdt_remote_perm));
328         ptlrpc_request_set_replen(req);
329         return req;
330 }
331
332 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
333                                                      struct lookup_intent *it,
334                                                      struct md_op_data *op_data)
335 {
336         struct ptlrpc_request *req;
337         struct obd_device     *obddev = class_exp2obd(exp);
338         struct ldlm_intent    *lit;
339         int                    rc;
340         ENTRY;
341
342         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343                                    &RQF_LDLM_INTENT_UNLINK);
344         if (req == NULL)
345                 RETURN(ERR_PTR(-ENOMEM));
346
347         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
348         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
349                              op_data->op_namelen + 1);
350
351         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 RETURN(ERR_PTR(rc));
355         }
356
357         /* pack the intent */
358         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
359         lit->opc = (__u64)it->it_op;
360
361         /* pack the intended request */
362         mdc_unlink_pack(req, op_data);
363
364         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
365                              obddev->u.cli.cl_max_mds_easize);
366         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
367                              obddev->u.cli.cl_max_mds_cookiesize);
368         ptlrpc_request_set_replen(req);
369         RETURN(req);
370 }
371
372 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
373                                                       struct lookup_intent *it,
374                                                       struct md_op_data *op_data)
375 {
376         struct ptlrpc_request *req;
377         struct obd_device     *obddev = class_exp2obd(exp);
378         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
379                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
380                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
381                                        (client_is_remote(exp) ?
382                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
383         struct ldlm_intent    *lit;
384         int                    rc;
385         ENTRY;
386
387         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388                                    &RQF_LDLM_INTENT_GETATTR);
389         if (req == NULL)
390                 RETURN(ERR_PTR(-ENOMEM));
391
392         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
393         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
394                              op_data->op_namelen + 1);
395
396         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(ERR_PTR(rc));
400         }
401
402         /* pack the intent */
403         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404         lit->opc = (__u64)it->it_op;
405
406         /* pack the intended request */
407         mdc_getattr_pack(req, valid, it->it_flags, op_data);
408
409         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
410                              obddev->u.cli.cl_max_mds_easize);
411         if (client_is_remote(exp))
412                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
413                                      sizeof(struct mdt_remote_perm));
414         ptlrpc_request_set_replen(req);
415         RETURN(req);
416 }
417
418 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
419 {
420         struct ptlrpc_request *req;
421         int rc;
422         ENTRY;
423
424         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
425         if (req == NULL)
426                 RETURN(ERR_PTR(-ENOMEM));
427
428         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
429         if (rc) {
430                 ptlrpc_request_free(req);
431                 RETURN(ERR_PTR(rc));
432         }
433
434         ptlrpc_request_set_replen(req);
435         RETURN(req);
436 }
437
438 static int mdc_finish_enqueue(struct obd_export *exp,
439                               struct ptlrpc_request *req,
440                               struct ldlm_enqueue_info *einfo,
441                               struct lookup_intent *it,
442                               struct lustre_handle *lockh,
443                               int rc)
444 {
445         struct req_capsule  *pill = &req->rq_pill;
446         struct ldlm_request *lockreq;
447         struct ldlm_reply   *lockrep;
448         ENTRY;
449
450         LASSERT(rc >= 0);
451         /* Similarly, if we're going to replay this request, we don't want to
452          * actually get a lock, just perform the intent. */
453         if (req->rq_transno || req->rq_replay) {
454                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
455                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
456         }
457
458         if (rc == ELDLM_LOCK_ABORTED) {
459                 einfo->ei_mode = 0;
460                 memset(lockh, 0, sizeof(*lockh));
461                 rc = 0;
462         } else { /* rc = 0 */
463                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
464                 LASSERT(lock);
465
466                 /* If the server gave us back a different lock mode, we should
467                  * fix up our variables. */
468                 if (lock->l_req_mode != einfo->ei_mode) {
469                         ldlm_lock_addref(lockh, lock->l_req_mode);
470                         ldlm_lock_decref(lockh, einfo->ei_mode);
471                         einfo->ei_mode = lock->l_req_mode;
472                 }
473                 LDLM_LOCK_PUT(lock);
474         }
475
476         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
477         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
478
479         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
480         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
481         it->d.lustre.it_lock_mode = einfo->ei_mode;
482         it->d.lustre.it_data = req;
483
484         if (it->d.lustre.it_status < 0 && req->rq_replay)
485                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
486
487         /* If we're doing an IT_OPEN which did not result in an actual
488          * successful open, then we need to remove the bit which saves
489          * this request for unconditional replay.
490          *
491          * It's important that we do this first!  Otherwise we might exit the
492          * function without doing so, and try to replay a failed create
493          * (bug 3440) */
494         if (it->it_op & IT_OPEN && req->rq_replay &&
495             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
496                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
497
498         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
499                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
500
501         /* We know what to expect, so we do any byte flipping required here */
502         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
503                 struct mdt_body *body;
504
505                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
506                 if (body == NULL) {
507                         CERROR ("Can't swab mdt_body\n");
508                         RETURN (-EPROTO);
509                 }
510
511                 if (it_disposition(it, DISP_OPEN_OPEN) &&
512                     !it_open_error(DISP_OPEN_OPEN, it)) {
513                         /*
514                          * If this is a successful OPEN request, we need to set
515                          * replay handler and data early, so that if replay
516                          * happens immediately after swabbing below, new reply
517                          * is swabbed by that handler correctly.
518                          */
519                         mdc_set_open_replay_data(NULL, NULL, req);
520                 }
521
522                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
523                         void *eadata;
524
525                         /*
526                          * The eadata is opaque; just check that it is there.
527                          * Eventually, obd_unpackmd() will check the contents.
528                          */
529                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
530                                                               body->eadatasize);
531                         if (eadata == NULL)
532                                 RETURN(-EPROTO);
533
534                         if (body->valid & OBD_MD_FLMODEASIZE) {
535                                 struct obd_device *obddev = class_exp2obd(exp);
536
537                                 if (obddev->u.cli.cl_max_mds_easize <
538                                     body->max_mdsize) {
539                                         obddev->u.cli.cl_max_mds_easize =
540                                                 body->max_mdsize;
541                                         CDEBUG(D_INFO, "maxeasize become %d\n",
542                                                body->max_mdsize);
543                                 }
544                                 if (obddev->u.cli.cl_max_mds_cookiesize <
545                                     body->max_cookiesize) {
546                                         obddev->u.cli.cl_max_mds_cookiesize =
547                                                 body->max_cookiesize;
548                                         CDEBUG(D_INFO, "cookiesize become %d\n",
549                                                body->max_cookiesize);
550                                 }
551                         }
552
553                         /*
554                          * We save the reply LOV EA in case we have to replay a
555                          * create for recovery.  If we didn't allocate a large
556                          * enough request buffer above we need to reallocate it
557                          * here to hold the actual LOV EA.
558                          *
559                          * To not save LOV EA if request is not going to replay
560                          * (for example error one).
561                          */
562                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
563                                 void *lmm;
564                                 if (req_capsule_get_size(pill, &RMF_EADATA,
565                                                          RCL_CLIENT) <
566                                     body->eadatasize) {
567                                         mdc_realloc_openmsg(req, body);
568                                         req_capsule_set_size(pill, &RMF_EADATA,
569                                                              RCL_CLIENT,
570                                                              body->eadatasize);
571                                 }
572                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
573                                 if (lmm)
574                                         memcpy(lmm, eadata, body->eadatasize);
575                         }
576                 }
577
578                 if (body->valid & OBD_MD_FLRMTPERM) {
579                         struct mdt_remote_perm *perm;
580
581                         LASSERT(client_is_remote(exp));
582                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
583                                                 lustre_swab_mdt_remote_perm);
584                         if (perm == NULL)
585                                 RETURN(-EPROTO);
586                 }
587                 if (body->valid & OBD_MD_FLMDSCAPA) {
588                         struct lustre_capa *capa, *p;
589
590                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
591                         if (capa == NULL)
592                                 RETURN(-EPROTO);
593
594                         if (it->it_op & IT_OPEN) {
595                                 /* client fid capa will be checked in replay */
596                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
597                                 LASSERT(p);
598                                 *p = *capa;
599                         }
600                 }
601                 if (body->valid & OBD_MD_FLOSSCAPA) {
602                         struct lustre_capa *capa;
603
604                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
605                         if (capa == NULL)
606                                 RETURN(-EPROTO);
607                 }
608         }
609
610         RETURN(rc);
611 }
612
613 /* We always reserve enough space in the reply packet for a stripe MD, because
614  * we don't know in advance the file type. */
615 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
616                 struct lookup_intent *it, struct md_op_data *op_data,
617                 struct lustre_handle *lockh, void *lmm, int lmmsize,
618                 struct ptlrpc_request **reqp, int extra_lock_flags)
619 {
620         struct obd_device     *obddev = class_exp2obd(exp);
621         struct ptlrpc_request *req = NULL;
622         struct req_capsule    *pill;
623         int                    flags = extra_lock_flags;
624         int                    rc;
625         struct ldlm_res_id res_id;
626         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
627         ENTRY;
628
629         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
630                  einfo->ei_type);
631
632         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
633
634         if (it)
635                 flags |= LDLM_FL_HAS_INTENT;
636         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
637                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
638
639         if (reqp)
640                 req = *reqp;
641
642         if (!it) {
643                 /* The only way right now is FLOCK, in this case we hide flock
644                    policy as lmm, but lmmsize is 0 */
645                 LASSERT(lmm && lmmsize == 0);
646                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
647                          einfo->ei_type);
648                 policy = *(ldlm_policy_data_t *)lmm;
649                 res_id.name[3] = LDLM_FLOCK;
650         } else if (it->it_op & IT_OPEN) {
651                 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
652                                               op_data->op_data);
653
654                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
655                                            einfo->ei_cbdata);
656                 if (!joinfile) {
657                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
658                         einfo->ei_cbdata = NULL;
659                         lmm = NULL;
660                 } else
661                         it->it_flags &= ~O_JOIN_FILE;
662         } else if (it->it_op & IT_UNLINK)
663                 req = mdc_intent_unlink_pack(exp, it, op_data);
664         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
665                 req = mdc_intent_getattr_pack(exp, it, op_data);
666         else if (it->it_op == IT_READDIR)
667                 req = ldlm_enqueue_pack(exp);
668         else {
669                 LBUG();
670                 RETURN(-EINVAL);
671         }
672
673         if (IS_ERR(req))
674                 RETURN(PTR_ERR(req));
675         pill = &req->rq_pill;
676
677         /* It is important to obtain rpc_lock first (if applicable), so that
678          * threads that are serialised with rpc_lock are not polluting our
679          * rpcs in flight counter. We do not do flock request limiting, though*/
680         if (it) {
681                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
682                 mdc_enter_request(&obddev->u.cli);
683         }
684         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
685                               0, NULL, lockh, 0);
686         if (reqp)
687                 *reqp = req;
688
689         if (it) {
690                 mdc_exit_request(&obddev->u.cli);
691                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
692         }
693         if (!it) {
694                 /* For flock requests we immediatelly return without further
695                    delay and let caller deal with the rest, since rest of
696                    this function metadata processing makes no sense for flock
697                    requests anyway */
698                 RETURN(rc);
699         }
700
701         if (rc < 0) {
702                 CERROR("ldlm_cli_enqueue: %d\n", rc);
703                 mdc_clear_replay_flag(req, rc);
704                 ptlrpc_req_finished(req);
705                 RETURN(rc);
706         }
707         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
708
709         RETURN(rc);
710 }
711
712 static int mdc_finish_intent_lock(struct obd_export *exp,
713                                   struct ptlrpc_request *request,
714                                   struct md_op_data *op_data,
715                                   struct lookup_intent *it,
716                                   struct lustre_handle *lockh)
717 {
718         struct lustre_handle old_lock;
719         struct mdt_body *mdt_body;
720         struct ldlm_lock *lock;
721         int rc;
722
723
724         LASSERT(request != NULL);
725         LASSERT(request != LP_POISON);
726         LASSERT(request->rq_repmsg != LP_POISON);
727
728         if (!it_disposition(it, DISP_IT_EXECD)) {
729                 /* The server failed before it even started executing the
730                  * intent, i.e. because it couldn't unpack the request. */
731                 LASSERT(it->d.lustre.it_status != 0);
732                 RETURN(it->d.lustre.it_status);
733         }
734         rc = it_open_error(DISP_IT_EXECD, it);
735         if (rc)
736                 RETURN(rc);
737
738         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
739         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
740
741         /* If we were revalidating a fid/name pair, mark the intent in
742          * case we fail and get called again from lookup */
743         if (fid_is_sane(&op_data->op_fid2) &&
744             it->it_flags & O_CHECK_STALE &&
745             it->it_op != IT_GETATTR) {
746                 it_set_disposition(it, DISP_ENQ_COMPLETE);
747
748                 /* Also: did we find the same inode? */
749                 /* sever can return one of two fids:
750                  * op_fid2 - new allocated fid - if file is created.
751                  * op_fid3 - existent fid - if file only open.
752                  * op_fid3 is saved in lmv_intent_open */
753                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
754                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
755                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
756                                "\n", PFID(&op_data->op_fid2),
757                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
758                         RETURN(-ESTALE);
759                 }
760         }
761
762         rc = it_open_error(DISP_LOOKUP_EXECD, it);
763         if (rc)
764                 RETURN(rc);
765
766         /* keep requests around for the multiple phases of the call
767          * this shows the DISP_XX must guarantee we make it into the call
768          */
769         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
770             it_disposition(it, DISP_OPEN_CREATE) &&
771             !it_open_error(DISP_OPEN_CREATE, it)) {
772                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
773                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
774         }
775         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
776             it_disposition(it, DISP_OPEN_OPEN) &&
777             !it_open_error(DISP_OPEN_OPEN, it)) {
778                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
779                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
780                 /* BUG 11546 - eviction in the middle of open rpc processing */
781                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
782         }
783
784         if (it->it_op & IT_CREAT) {
785                 /* XXX this belongs in ll_create_it */
786         } else if (it->it_op == IT_OPEN) {
787                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
788         } else {
789                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
790         }
791
792         /* If we already have a matching lock, then cancel the new
793          * one.  We have to set the data here instead of in
794          * mdc_enqueue, because we need to use the child's inode as
795          * the l_ast_data to match, and that's not available until
796          * intent_finish has performed the iget().) */
797         lock = ldlm_handle2lock(lockh);
798         if (lock) {
799                 ldlm_policy_data_t policy = lock->l_policy_data;
800                 LDLM_DEBUG(lock, "matching against this");
801
802                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
803                                          &lock->l_resource->lr_name),
804                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
805                          (unsigned long)lock->l_resource->lr_name.name[0],
806                          (unsigned long)lock->l_resource->lr_name.name[1],
807                          (unsigned long)lock->l_resource->lr_name.name[2],
808                          (unsigned long)fid_seq(&mdt_body->fid1),
809                          (unsigned long)fid_oid(&mdt_body->fid1),
810                          (unsigned long)fid_ver(&mdt_body->fid1));
811                 LDLM_LOCK_PUT(lock);
812
813                 memcpy(&old_lock, lockh, sizeof(*lockh));
814                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
815                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
816                         ldlm_lock_decref_and_cancel(lockh,
817                                                     it->d.lustre.it_lock_mode);
818                         memcpy(lockh, &old_lock, sizeof(old_lock));
819                         it->d.lustre.it_lock_handle = lockh->cookie;
820                 }
821         }
822         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
823                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
824                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
825         RETURN(rc);
826 }
827
828 /*
829  * This long block is all about fixing up the lock and request state
830  * so that it is correct as of the moment _before_ the operation was
831  * applied; that way, the VFS will think that everything is normal and
832  * call Lustre's regular VFS methods.
833  *
834  * If we're performing a creation, that means that unless the creation
835  * failed with EEXIST, we should fake up a negative dentry.
836  *
837  * For everything else, we want to lookup to succeed.
838  *
839  * One additional note: if CREATE or OPEN succeeded, we add an extra
840  * reference to the request because we need to keep it around until
841  * ll_create/ll_open gets called.
842  *
843  * The server will return to us, in it_disposition, an indication of
844  * exactly what d.lustre.it_status refers to.
845  *
846  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
847  * otherwise if DISP_OPEN_CREATE is set, then it status is the
848  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
849  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
850  * was successful.
851  *
852  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
853  * child lookup.
854  */
855 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
856                     void *lmm, int lmmsize, struct lookup_intent *it,
857                     int lookup_flags, struct ptlrpc_request **reqp,
858                     ldlm_blocking_callback cb_blocking,
859                     int extra_lock_flags)
860 {
861         struct lustre_handle lockh;
862         int rc = 0;
863         ENTRY;
864         LASSERT(it);
865
866         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
867                ", intent: %s flags %#o\n", op_data->op_namelen,
868                op_data->op_name, PFID(&op_data->op_fid2),
869                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
870                it->it_flags);
871
872         lockh.cookie = 0;
873         if (fid_is_sane(&op_data->op_fid2) &&
874             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
875                 /* We could just return 1 immediately, but since we should only
876                  * be called in revalidate_it if we already have a lock, let's
877                  * verify that. */
878                 ldlm_policy_data_t policy;
879                 ldlm_mode_t mode;
880
881                 /* As not all attributes are kept under update lock, e.g.
882                    owner/group/acls are under lookup lock, we need both
883                    ibits for GETATTR. */
884
885                 /* For CMD, UPDATE lock and LOOKUP lock can not be got
886                  * at the same for cross-object, so we can not match
887                  * the 2 lock at the same time FIXME: but how to handle
888                  * the above situation */
889                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
890                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
891
892                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
893                                       &op_data->op_fid2, LDLM_IBITS, &policy,
894                                       LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
895                 if (mode) {
896                         it->d.lustre.it_lock_handle = lockh.cookie;
897                         it->d.lustre.it_lock_mode = mode;
898                 }
899
900                 /* Only return failure if it was not GETATTR by cfid
901                    (from inode_revalidate) */
902                 if (mode || op_data->op_namelen != 0)
903                         RETURN(!!mode);
904         }
905
906         /* lookup_it may be called only after revalidate_it has run, because
907          * revalidate_it cannot return errors, only zero.  Returning zero causes
908          * this call to lookup, which *can* return an error.
909          *
910          * We only want to execute the request associated with the intent one
911          * time, however, so don't send the request again.  Instead, skip past
912          * this and use the request from revalidate.  In this case, revalidate
913          * never dropped its reference, so the refcounts are all OK */
914         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
915                 struct ldlm_enqueue_info einfo =
916                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
917                           ldlm_completion_ast, NULL, NULL, NULL };
918
919                 /* For case if upper layer did not alloc fid, do it now. */
920                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
921                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
922                         if (rc < 0) {
923                                 CERROR("Can't alloc new fid, rc %d\n", rc);
924                                 RETURN(rc);
925                         }
926                 }
927                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
928                                  lmm, lmmsize, NULL, extra_lock_flags);
929                 if (rc < 0)
930                         RETURN(rc);
931                 it->d.lustre.it_lock_handle = lockh.cookie;
932         } else if (!fid_is_sane(&op_data->op_fid2) ||
933                    !(it->it_flags & O_CHECK_STALE)) {
934                 /* DISP_ENQ_COMPLETE set means there is extra reference on
935                  * request referenced from this intent, saved for subsequent
936                  * lookup.  This path is executed when we proceed to this
937                  * lookup, so we clear DISP_ENQ_COMPLETE */
938                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
939         }
940         *reqp = it->d.lustre.it_data;
941         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
942         RETURN(rc);
943 }
944
945 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
946                                               void *unused, int rc)
947 {
948         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
949         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
950         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
951         struct lookup_intent     *it;
952         struct lustre_handle     *lockh;
953         struct obd_device        *obddev;
954         int                       flags = LDLM_FL_HAS_INTENT;
955         ENTRY;
956
957         it    = &minfo->mi_it;
958         lockh = &minfo->mi_lockh;
959
960         obddev = class_exp2obd(exp);
961
962         mdc_exit_request(&obddev->u.cli);
963         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
964                 rc = -ETIMEDOUT;
965
966         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
967                                    &flags, NULL, 0, NULL, lockh, rc);
968         if (rc < 0) {
969                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
970                 mdc_clear_replay_flag(req, rc);
971                 GOTO(out, rc);
972         }
973
974         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
975         if (rc)
976                 GOTO(out, rc);
977
978         it->d.lustre.it_lock_handle = lockh->cookie;
979
980         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
981         EXIT;
982
983 out:
984         OBD_FREE_PTR(einfo);
985         minfo->mi_cb(req, minfo, rc);
986         return 0;
987 }
988
989 int mdc_intent_getattr_async(struct obd_export *exp,
990                              struct md_enqueue_info *minfo,
991                              struct ldlm_enqueue_info *einfo)
992 {
993         struct md_op_data       *op_data = &minfo->mi_data;
994         struct lookup_intent    *it = &minfo->mi_it;
995         struct ptlrpc_request   *req;
996         struct obd_device       *obddev = class_exp2obd(exp);
997         struct ldlm_res_id       res_id;
998         ldlm_policy_data_t       policy = {
999                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
1000                                  };
1001         int                      rc;
1002         int                      flags = LDLM_FL_HAS_INTENT;
1003         ENTRY;
1004
1005         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1006                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1007                ldlm_it2str(it->it_op), it->it_flags);
1008
1009         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1010         req = mdc_intent_getattr_pack(exp, it, op_data);
1011         if (!req)
1012                 RETURN(-ENOMEM);
1013
1014         mdc_enter_request(&obddev->u.cli);
1015         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1016                               0, NULL, &minfo->mi_lockh, 1);
1017         if (rc < 0) {
1018                 mdc_exit_request(&obddev->u.cli);
1019                 RETURN(rc);
1020         }
1021
1022         req->rq_async_args.pointer_arg[0] = exp;
1023         req->rq_async_args.pointer_arg[1] = minfo;
1024         req->rq_async_args.pointer_arg[2] = einfo;
1025         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1026         ptlrpcd_add_req(req);
1027
1028         RETURN(0);
1029 }
1030
1031 int mdc_revalidate_lock(struct obd_export *exp,
1032                         struct lookup_intent *it,
1033                         struct lu_fid *fid)
1034 {
1035         /* We could just return 1 immediately, but since we should only
1036          * be called in revalidate_it if we already have a lock, let's
1037          * verify that. */
1038         struct ldlm_res_id res_id;
1039         struct lustre_handle lockh;
1040         ldlm_policy_data_t policy;
1041         ldlm_mode_t mode;
1042         ENTRY;
1043
1044         fid_build_reg_res_name(fid, &res_id);
1045         /* As not all attributes are kept under update lock, e.g. 
1046            owner/group/acls are under lookup lock, we need both 
1047            ibits for GETATTR. */
1048         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1049                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1050                 MDS_INODELOCK_LOOKUP;
1051
1052         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1053                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1054                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
1055         if (mode) {
1056                 it->d.lustre.it_lock_handle = lockh.cookie;
1057                 it->d.lustre.it_lock_mode = mode;
1058         }
1059
1060         RETURN(!!mode);
1061 }