Whamcloud - gitweb
b=23399 Deadlock probably due to statahead
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode,
173                       ldlm_cancel_flags_t flags,
174                       void *opaque)
175 {
176         struct ldlm_res_id res_id;
177         struct obd_device *obd = class_exp2obd(exp);
178         int rc;
179
180         ENTRY;
181
182         fid_build_reg_res_name(fid, &res_id);
183         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
184                                              policy, mode, flags, opaque);
185         RETURN(rc);
186 }
187
188 int mdc_change_cbdata(struct obd_export *exp,
189                       const struct lu_fid *fid,
190                       ldlm_iterator_t it, void *data)
191 {
192         struct ldlm_res_id res_id;
193         ENTRY;
194
195         fid_build_reg_res_name(fid, &res_id);
196         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
197                               &res_id, it, data);
198
199         EXIT;
200         return 0;
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 cfs_spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 cfs_spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267                                                    struct lookup_intent *it,
268                                                    struct md_op_data *op_data,
269                                                    void *lmm, int lmmsize,
270                                                    void *cb_data)
271 {
272         struct ptlrpc_request *req;
273         struct obd_device     *obddev = class_exp2obd(exp);
274         struct ldlm_intent    *lit;
275         CFS_LIST_HEAD(cancels);
276         int                    count = 0;
277         int                    mode;
278         int                    rc;
279         ENTRY;
280
281         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
282
283         /* XXX: openlock is not cancelled for cross-refs. */
284         /* If inode is known, cancel conflicting OPEN locks. */
285         if (fid_is_sane(&op_data->op_fid2)) {
286                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287                         mode = LCK_CW;
288 #ifdef FMODE_EXEC
289                 else if (it->it_flags & FMODE_EXEC)
290                         mode = LCK_PR;
291 #endif
292                 else
293                         mode = LCK_CR;
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         /* parent capability */
316         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317         /* child capability, reserve the size according to parent capa, it will
318          * be filled after we get the reply */
319         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
320
321         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322                              op_data->op_namelen + 1);
323         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
325
326         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 return NULL;
330         }
331
332         cfs_spin_lock(&req->rq_lock);
333         req->rq_replay = req->rq_import->imp_replayable;
334         cfs_spin_unlock(&req->rq_lock);
335
336         /* pack the intent */
337         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338         lit->opc = (__u64)it->it_op;
339
340         /* pack the intended request */
341         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342                       lmmsize);
343
344         /* for remote client, fetch remote perm for current user */
345         if (client_is_remote(exp))
346                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
347                                      sizeof(struct mdt_remote_perm));
348         ptlrpc_request_set_replen(req);
349         return req;
350 }
351
352 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
353                                                      struct lookup_intent *it,
354                                                      struct md_op_data *op_data)
355 {
356         struct ptlrpc_request *req;
357         struct obd_device     *obddev = class_exp2obd(exp);
358         struct ldlm_intent    *lit;
359         int                    rc;
360         ENTRY;
361
362         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363                                    &RQF_LDLM_INTENT_UNLINK);
364         if (req == NULL)
365                 RETURN(ERR_PTR(-ENOMEM));
366
367         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
368         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
369                              op_data->op_namelen + 1);
370
371         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
372         if (rc) {
373                 ptlrpc_request_free(req);
374                 RETURN(ERR_PTR(rc));
375         }
376
377         /* pack the intent */
378         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379         lit->opc = (__u64)it->it_op;
380
381         /* pack the intended request */
382         mdc_unlink_pack(req, op_data);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
385                              obddev->u.cli.cl_max_mds_easize);
386         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
387                              obddev->u.cli.cl_max_mds_cookiesize);
388         ptlrpc_request_set_replen(req);
389         RETURN(req);
390 }
391
392 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
393                                                       struct lookup_intent *it,
394                                                       struct md_op_data *op_data)
395 {
396         struct ptlrpc_request *req;
397         struct obd_device     *obddev = class_exp2obd(exp);
398         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
399                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
400                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
401                                        (client_is_remote(exp) ?
402                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_GETATTR);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
413         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414                              op_data->op_namelen + 1);
415
416         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(ERR_PTR(rc));
420         }
421
422         /* pack the intent */
423         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424         lit->opc = (__u64)it->it_op;
425
426         /* pack the intended request */
427         mdc_getattr_pack(req, valid, it->it_flags, op_data);
428
429         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
430                              obddev->u.cli.cl_max_mds_easize);
431         if (client_is_remote(exp))
432                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
433                                      sizeof(struct mdt_remote_perm));
434         ptlrpc_request_set_replen(req);
435         RETURN(req);
436 }
437
438 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
439 {
440         struct ptlrpc_request *req;
441         int rc;
442         ENTRY;
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
445         if (req == NULL)
446                 RETURN(ERR_PTR(-ENOMEM));
447
448         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 RETURN(ERR_PTR(rc));
452         }
453
454         ptlrpc_request_set_replen(req);
455         RETURN(req);
456 }
457
458 static int mdc_finish_enqueue(struct obd_export *exp,
459                               struct ptlrpc_request *req,
460                               struct ldlm_enqueue_info *einfo,
461                               struct lookup_intent *it,
462                               struct lustre_handle *lockh,
463                               int rc)
464 {
465         struct req_capsule  *pill = &req->rq_pill;
466         struct ldlm_request *lockreq;
467         struct ldlm_reply   *lockrep;
468         ENTRY;
469
470         LASSERT(rc >= 0);
471         /* Similarly, if we're going to replay this request, we don't want to
472          * actually get a lock, just perform the intent. */
473         if (req->rq_transno || req->rq_replay) {
474                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
475                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
476         }
477
478         if (rc == ELDLM_LOCK_ABORTED) {
479                 einfo->ei_mode = 0;
480                 memset(lockh, 0, sizeof(*lockh));
481                 rc = 0;
482         } else { /* rc = 0 */
483                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
484                 LASSERT(lock);
485
486                 /* If the server gave us back a different lock mode, we should
487                  * fix up our variables. */
488                 if (lock->l_req_mode != einfo->ei_mode) {
489                         ldlm_lock_addref(lockh, lock->l_req_mode);
490                         ldlm_lock_decref(lockh, einfo->ei_mode);
491                         einfo->ei_mode = lock->l_req_mode;
492                 }
493                 LDLM_LOCK_PUT(lock);
494         }
495
496         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
497         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
498
499         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
500         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
501         it->d.lustre.it_lock_mode = einfo->ei_mode;
502         it->d.lustre.it_lock_handle = lockh->cookie;
503         it->d.lustre.it_data = req;
504
505         if (it->d.lustre.it_status < 0 && req->rq_replay)
506                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
507
508         /* If we're doing an IT_OPEN which did not result in an actual
509          * successful open, then we need to remove the bit which saves
510          * this request for unconditional replay.
511          *
512          * It's important that we do this first!  Otherwise we might exit the
513          * function without doing so, and try to replay a failed create
514          * (bug 3440) */
515         if (it->it_op & IT_OPEN && req->rq_replay &&
516             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
517                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
518
519         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
520                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
521
522         /* We know what to expect, so we do any byte flipping required here */
523         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
524                 struct mdt_body *body;
525
526                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
527                 if (body == NULL) {
528                         CERROR ("Can't swab mdt_body\n");
529                         RETURN (-EPROTO);
530                 }
531
532                 if (it_disposition(it, DISP_OPEN_OPEN) &&
533                     !it_open_error(DISP_OPEN_OPEN, it)) {
534                         /*
535                          * If this is a successful OPEN request, we need to set
536                          * replay handler and data early, so that if replay
537                          * happens immediately after swabbing below, new reply
538                          * is swabbed by that handler correctly.
539                          */
540                         mdc_set_open_replay_data(NULL, NULL, req);
541                 }
542
543                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
544                         void *eadata;
545
546                          mdc_update_max_ea_from_body(exp, body);
547
548                         /*
549                          * The eadata is opaque; just check that it is there.
550                          * Eventually, obd_unpackmd() will check the contents.
551                          */
552                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
553                                                               body->eadatasize);
554                         if (eadata == NULL)
555                                 RETURN(-EPROTO);
556
557                         /*
558                          * We save the reply LOV EA in case we have to replay a
559                          * create for recovery.  If we didn't allocate a large
560                          * enough request buffer above we need to reallocate it
561                          * here to hold the actual LOV EA.
562                          *
563                          * To not save LOV EA if request is not going to replay
564                          * (for example error one).
565                          */
566                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
567                                 void *lmm;
568                                 if (req_capsule_get_size(pill, &RMF_EADATA,
569                                                          RCL_CLIENT) <
570                                     body->eadatasize)
571                                         mdc_realloc_openmsg(req, body);
572                                 else
573                                         req_capsule_shrink(pill, &RMF_EADATA,
574                                                            body->eadatasize,
575                                                            RCL_CLIENT);
576
577                                 req_capsule_set_size(pill, &RMF_EADATA,
578                                                      RCL_CLIENT,
579                                                      body->eadatasize);
580
581                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
582                                 if (lmm)
583                                         memcpy(lmm, eadata, body->eadatasize);
584                         }
585                 }
586
587                 if (body->valid & OBD_MD_FLRMTPERM) {
588                         struct mdt_remote_perm *perm;
589
590                         LASSERT(client_is_remote(exp));
591                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
592                                                 lustre_swab_mdt_remote_perm);
593                         if (perm == NULL)
594                                 RETURN(-EPROTO);
595                 }
596                 if (body->valid & OBD_MD_FLMDSCAPA) {
597                         struct lustre_capa *capa, *p;
598
599                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
600                         if (capa == NULL)
601                                 RETURN(-EPROTO);
602
603                         if (it->it_op & IT_OPEN) {
604                                 /* client fid capa will be checked in replay */
605                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
606                                 LASSERT(p);
607                                 *p = *capa;
608                         }
609                 }
610                 if (body->valid & OBD_MD_FLOSSCAPA) {
611                         struct lustre_capa *capa;
612
613                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
614                         if (capa == NULL)
615                                 RETURN(-EPROTO);
616                 }
617         }
618
619         RETURN(rc);
620 }
621
622 /* We always reserve enough space in the reply packet for a stripe MD, because
623  * we don't know in advance the file type. */
624 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
625                 struct lookup_intent *it, struct md_op_data *op_data,
626                 struct lustre_handle *lockh, void *lmm, int lmmsize,
627                 struct ptlrpc_request **reqp, int extra_lock_flags)
628 {
629         struct obd_device     *obddev = class_exp2obd(exp);
630         struct ptlrpc_request *req = NULL;
631         struct req_capsule    *pill;
632         int                    flags = extra_lock_flags;
633         int                    rc;
634         struct ldlm_res_id res_id;
635         static const ldlm_policy_data_t lookup_policy =
636                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
637         static const ldlm_policy_data_t update_policy =
638                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
639         ldlm_policy_data_t const *policy = &lookup_policy;
640         ENTRY;
641
642         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
643                  einfo->ei_type);
644
645         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
646
647         if (it)
648                 flags |= LDLM_FL_HAS_INTENT;
649         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
650                 policy = &update_policy;
651
652         if (reqp)
653                 req = *reqp;
654
655         if (!it) {
656                 /* The only way right now is FLOCK, in this case we hide flock
657                    policy as lmm, but lmmsize is 0 */
658                 LASSERT(lmm && lmmsize == 0);
659                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
660                          einfo->ei_type);
661                 policy = (ldlm_policy_data_t *)lmm;
662                 res_id.name[3] = LDLM_FLOCK;
663         } else if (it->it_op & IT_OPEN) {
664                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
665                                            einfo->ei_cbdata);
666                 policy = &update_policy;
667                 einfo->ei_cbdata = NULL;
668                 lmm = NULL;
669         } else if (it->it_op & IT_UNLINK)
670                 req = mdc_intent_unlink_pack(exp, it, op_data);
671         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
672                 req = mdc_intent_getattr_pack(exp, it, op_data);
673         else if (it->it_op == IT_READDIR)
674                 req = ldlm_enqueue_pack(exp);
675         else {
676                 LBUG();
677                 RETURN(-EINVAL);
678         }
679
680         if (IS_ERR(req))
681                 RETURN(PTR_ERR(req));
682         pill = &req->rq_pill;
683
684         /* It is important to obtain rpc_lock first (if applicable), so that
685          * threads that are serialised with rpc_lock are not polluting our
686          * rpcs in flight counter. We do not do flock request limiting, though*/
687         if (it) {
688                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
689                 mdc_enter_request(&obddev->u.cli);
690         }
691
692         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
693                               0, lockh, 0);
694         if (reqp)
695                 *reqp = req;
696
697         if (it) {
698                 mdc_exit_request(&obddev->u.cli);
699                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
700         }
701         if (!it) {
702                 /* For flock requests we immediatelly return without further
703                    delay and let caller deal with the rest, since rest of
704                    this function metadata processing makes no sense for flock
705                    requests anyway */
706                 RETURN(rc);
707         }
708
709         if (rc < 0) {
710                 CERROR("ldlm_cli_enqueue: %d\n", rc);
711                 mdc_clear_replay_flag(req, rc);
712                 ptlrpc_req_finished(req);
713                 RETURN(rc);
714         }
715         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
716
717         RETURN(rc);
718 }
719
720 static int mdc_finish_intent_lock(struct obd_export *exp,
721                                   struct ptlrpc_request *request,
722                                   struct md_op_data *op_data,
723                                   struct lookup_intent *it,
724                                   struct lustre_handle *lockh)
725 {
726         struct lustre_handle old_lock;
727         struct mdt_body *mdt_body;
728         struct ldlm_lock *lock;
729         int rc;
730
731
732         LASSERT(request != NULL);
733         LASSERT(request != LP_POISON);
734         LASSERT(request->rq_repmsg != LP_POISON);
735
736         if (!it_disposition(it, DISP_IT_EXECD)) {
737                 /* The server failed before it even started executing the
738                  * intent, i.e. because it couldn't unpack the request. */
739                 LASSERT(it->d.lustre.it_status != 0);
740                 RETURN(it->d.lustre.it_status);
741         }
742         rc = it_open_error(DISP_IT_EXECD, it);
743         if (rc)
744                 RETURN(rc);
745
746         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
747         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
748
749         /* If we were revalidating a fid/name pair, mark the intent in
750          * case we fail and get called again from lookup */
751         if (fid_is_sane(&op_data->op_fid2) &&
752             it->it_create_mode & M_CHECK_STALE &&
753             it->it_op != IT_GETATTR) {
754                 it_set_disposition(it, DISP_ENQ_COMPLETE);
755
756                 /* Also: did we find the same inode? */
757                 /* sever can return one of two fids:
758                  * op_fid2 - new allocated fid - if file is created.
759                  * op_fid3 - existent fid - if file only open.
760                  * op_fid3 is saved in lmv_intent_open */
761                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
762                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
763                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
764                                "\n", PFID(&op_data->op_fid2),
765                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
766                         RETURN(-ESTALE);
767                 }
768         }
769
770         rc = it_open_error(DISP_LOOKUP_EXECD, it);
771         if (rc)
772                 RETURN(rc);
773
774         /* keep requests around for the multiple phases of the call
775          * this shows the DISP_XX must guarantee we make it into the call
776          */
777         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
778             it_disposition(it, DISP_OPEN_CREATE) &&
779             !it_open_error(DISP_OPEN_CREATE, it)) {
780                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
781                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
782         }
783         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
784             it_disposition(it, DISP_OPEN_OPEN) &&
785             !it_open_error(DISP_OPEN_OPEN, it)) {
786                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
787                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
788                 /* BUG 11546 - eviction in the middle of open rpc processing */
789                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
790         }
791
792         if (it->it_op & IT_CREAT) {
793                 /* XXX this belongs in ll_create_it */
794         } else if (it->it_op == IT_OPEN) {
795                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
796         } else {
797                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
798         }
799
800         /* If we already have a matching lock, then cancel the new
801          * one.  We have to set the data here instead of in
802          * mdc_enqueue, because we need to use the child's inode as
803          * the l_ast_data to match, and that's not available until
804          * intent_finish has performed the iget().) */
805         lock = ldlm_handle2lock(lockh);
806         if (lock) {
807                 ldlm_policy_data_t policy = lock->l_policy_data;
808                 LDLM_DEBUG(lock, "matching against this");
809
810                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
811                                          &lock->l_resource->lr_name),
812                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
813                          (unsigned long)lock->l_resource->lr_name.name[0],
814                          (unsigned long)lock->l_resource->lr_name.name[1],
815                          (unsigned long)lock->l_resource->lr_name.name[2],
816                          (unsigned long)fid_seq(&mdt_body->fid1),
817                          (unsigned long)fid_oid(&mdt_body->fid1),
818                          (unsigned long)fid_ver(&mdt_body->fid1));
819                 LDLM_LOCK_PUT(lock);
820
821                 memcpy(&old_lock, lockh, sizeof(*lockh));
822                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
823                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
824                         ldlm_lock_decref_and_cancel(lockh,
825                                                     it->d.lustre.it_lock_mode);
826                         memcpy(lockh, &old_lock, sizeof(old_lock));
827                         it->d.lustre.it_lock_handle = lockh->cookie;
828                 }
829         }
830         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
831                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
832                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
833         RETURN(rc);
834 }
835
836 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
837                         struct lu_fid *fid)
838 {
839         /* We could just return 1 immediately, but since we should only
840          * be called in revalidate_it if we already have a lock, let's
841          * verify that. */
842         struct ldlm_res_id res_id;
843         struct lustre_handle lockh;
844         ldlm_policy_data_t policy;
845         ldlm_mode_t mode;
846         ENTRY;
847
848         fid_build_reg_res_name(fid, &res_id);
849         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
850                                   MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
851
852         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
853                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
854                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
855         if (mode) {
856                 it->d.lustre.it_lock_handle = lockh.cookie;
857                 it->d.lustre.it_lock_mode = mode;
858         }
859
860         RETURN(!!mode);
861 }
862
863 /*
864  * This long block is all about fixing up the lock and request state
865  * so that it is correct as of the moment _before_ the operation was
866  * applied; that way, the VFS will think that everything is normal and
867  * call Lustre's regular VFS methods.
868  *
869  * If we're performing a creation, that means that unless the creation
870  * failed with EEXIST, we should fake up a negative dentry.
871  *
872  * For everything else, we want to lookup to succeed.
873  *
874  * One additional note: if CREATE or OPEN succeeded, we add an extra
875  * reference to the request because we need to keep it around until
876  * ll_create/ll_open gets called.
877  *
878  * The server will return to us, in it_disposition, an indication of
879  * exactly what d.lustre.it_status refers to.
880  *
881  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
882  * otherwise if DISP_OPEN_CREATE is set, then it status is the
883  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
884  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
885  * was successful.
886  *
887  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
888  * child lookup.
889  */
890 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
891                     void *lmm, int lmmsize, struct lookup_intent *it,
892                     int lookup_flags, struct ptlrpc_request **reqp,
893                     ldlm_blocking_callback cb_blocking,
894                     int extra_lock_flags)
895 {
896         struct lustre_handle lockh;
897         int rc = 0;
898         ENTRY;
899         LASSERT(it);
900
901         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
902                ", intent: %s flags %#o\n", op_data->op_namelen,
903                op_data->op_name, PFID(&op_data->op_fid2),
904                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
905                it->it_flags);
906
907         lockh.cookie = 0;
908         if (fid_is_sane(&op_data->op_fid2) &&
909             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
910                 /* We could just return 1 immediately, but since we should only
911                  * be called in revalidate_it if we already have a lock, let's
912                  * verify that. */
913                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2);
914                 /* Only return failure if it was not GETATTR by cfid
915                    (from inode_revalidate) */
916                 if (rc || op_data->op_namelen != 0)
917                         RETURN(rc);
918         }
919
920         /* lookup_it may be called only after revalidate_it has run, because
921          * revalidate_it cannot return errors, only zero.  Returning zero causes
922          * this call to lookup, which *can* return an error.
923          *
924          * We only want to execute the request associated with the intent one
925          * time, however, so don't send the request again.  Instead, skip past
926          * this and use the request from revalidate.  In this case, revalidate
927          * never dropped its reference, so the refcounts are all OK */
928         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
929                 struct ldlm_enqueue_info einfo =
930                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
931                           ldlm_completion_ast, NULL, NULL, NULL };
932
933                 /* For case if upper layer did not alloc fid, do it now. */
934                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
935                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
936                         if (rc < 0) {
937                                 CERROR("Can't alloc new fid, rc %d\n", rc);
938                                 RETURN(rc);
939                         }
940                 }
941                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
942                                  lmm, lmmsize, NULL, extra_lock_flags);
943                 if (rc < 0)
944                         RETURN(rc);
945         } else if (!fid_is_sane(&op_data->op_fid2) ||
946                    !(it->it_create_mode & M_CHECK_STALE)) {
947                 /* DISP_ENQ_COMPLETE set means there is extra reference on
948                  * request referenced from this intent, saved for subsequent
949                  * lookup.  This path is executed when we proceed to this
950                  * lookup, so we clear DISP_ENQ_COMPLETE */
951                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
952         }
953         *reqp = it->d.lustre.it_data;
954         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
955         RETURN(rc);
956 }
957
958 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
959                                               struct ptlrpc_request *req,
960                                               void *unused, int rc)
961 {
962         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
963         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
964         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
965         struct lookup_intent     *it;
966         struct lustre_handle     *lockh;
967         struct obd_device        *obddev;
968         int                       flags = LDLM_FL_HAS_INTENT;
969         ENTRY;
970
971         it    = &minfo->mi_it;
972         lockh = &minfo->mi_lockh;
973
974         obddev = class_exp2obd(exp);
975
976         mdc_exit_request(&obddev->u.cli);
977         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
978                 rc = -ETIMEDOUT;
979
980         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
981                                    &flags, NULL, 0, lockh, rc);
982         if (rc < 0) {
983                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
984                 mdc_clear_replay_flag(req, rc);
985                 GOTO(out, rc);
986         }
987
988         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
989         if (rc)
990                 GOTO(out, rc);
991
992         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
993         EXIT;
994
995 out:
996         OBD_FREE_PTR(einfo);
997         minfo->mi_cb(req, minfo, rc);
998         return 0;
999 }
1000
1001 int mdc_intent_getattr_async(struct obd_export *exp,
1002                              struct md_enqueue_info *minfo,
1003                              struct ldlm_enqueue_info *einfo)
1004 {
1005         struct md_op_data       *op_data = &minfo->mi_data;
1006         struct lookup_intent    *it = &minfo->mi_it;
1007         struct ptlrpc_request   *req;
1008         struct obd_device       *obddev = class_exp2obd(exp);
1009         struct ldlm_res_id       res_id;
1010         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1011          *     for statahead currently. Consider CMD in future, such two bits
1012          *     maybe managed by different MDS, should be adjusted then. */
1013         ldlm_policy_data_t       policy = {
1014                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1015                                                          MDS_INODELOCK_UPDATE }
1016                                  };
1017         int                      rc;
1018         int                      flags = LDLM_FL_HAS_INTENT;
1019         ENTRY;
1020
1021         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1022                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1023                ldlm_it2str(it->it_op), it->it_flags);
1024
1025         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1026         req = mdc_intent_getattr_pack(exp, it, op_data);
1027         if (!req)
1028                 RETURN(-ENOMEM);
1029
1030         mdc_enter_request(&obddev->u.cli);
1031         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1032                               0, &minfo->mi_lockh, 1);
1033         if (rc < 0) {
1034                 mdc_exit_request(&obddev->u.cli);
1035                 RETURN(rc);
1036         }
1037
1038         req->rq_async_args.pointer_arg[0] = exp;
1039         req->rq_async_args.pointer_arg[1] = minfo;
1040         req->rq_async_args.pointer_arg[2] = einfo;
1041         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1042         ptlrpcd_add_req(req, PSCOPE_OTHER);
1043
1044         RETURN(0);
1045 }