Whamcloud - gitweb
LU-82 Remove useless clio locks
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode,
173                       ldlm_cancel_flags_t flags,
174                       void *opaque)
175 {
176         struct ldlm_res_id res_id;
177         struct obd_device *obd = class_exp2obd(exp);
178         int rc;
179
180         ENTRY;
181
182         fid_build_reg_res_name(fid, &res_id);
183         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
184                                              policy, mode, flags, opaque);
185         RETURN(rc);
186 }
187
188 int mdc_change_cbdata(struct obd_export *exp,
189                       const struct lu_fid *fid,
190                       ldlm_iterator_t it, void *data)
191 {
192         struct ldlm_res_id res_id;
193         ENTRY;
194
195         fid_build_reg_res_name(fid, &res_id);
196         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
197                               &res_id, it, data);
198
199         EXIT;
200         return 0;
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 cfs_spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 cfs_spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267                                                    struct lookup_intent *it,
268                                                    struct md_op_data *op_data,
269                                                    void *lmm, int lmmsize,
270                                                    void *cb_data)
271 {
272         struct ptlrpc_request *req;
273         struct obd_device     *obddev = class_exp2obd(exp);
274         struct ldlm_intent    *lit;
275         CFS_LIST_HEAD(cancels);
276         int                    count = 0;
277         int                    mode;
278         int                    rc;
279         ENTRY;
280
281         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
282
283         /* XXX: openlock is not cancelled for cross-refs. */
284         /* If inode is known, cancel conflicting OPEN locks. */
285         if (fid_is_sane(&op_data->op_fid2)) {
286                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287                         mode = LCK_CW;
288 #ifdef FMODE_EXEC
289                 else if (it->it_flags & FMODE_EXEC)
290                         mode = LCK_PR;
291 #endif
292                 else
293                         mode = LCK_CR;
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         /* parent capability */
316         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317         /* child capability, reserve the size according to parent capa, it will
318          * be filled after we get the reply */
319         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
320
321         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322                              op_data->op_namelen + 1);
323         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
325
326         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 return NULL;
330         }
331
332         cfs_spin_lock(&req->rq_lock);
333         req->rq_replay = req->rq_import->imp_replayable;
334         cfs_spin_unlock(&req->rq_lock);
335
336         /* pack the intent */
337         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338         lit->opc = (__u64)it->it_op;
339
340         /* pack the intended request */
341         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342                       lmmsize);
343
344         /* for remote client, fetch remote perm for current user */
345         if (client_is_remote(exp))
346                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
347                                      sizeof(struct mdt_remote_perm));
348         ptlrpc_request_set_replen(req);
349         return req;
350 }
351
352 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
353                                                      struct lookup_intent *it,
354                                                      struct md_op_data *op_data)
355 {
356         struct ptlrpc_request *req;
357         struct obd_device     *obddev = class_exp2obd(exp);
358         struct ldlm_intent    *lit;
359         int                    rc;
360         ENTRY;
361
362         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363                                    &RQF_LDLM_INTENT_UNLINK);
364         if (req == NULL)
365                 RETURN(ERR_PTR(-ENOMEM));
366
367         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
368         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
369                              op_data->op_namelen + 1);
370
371         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
372         if (rc) {
373                 ptlrpc_request_free(req);
374                 RETURN(ERR_PTR(rc));
375         }
376
377         /* pack the intent */
378         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379         lit->opc = (__u64)it->it_op;
380
381         /* pack the intended request */
382         mdc_unlink_pack(req, op_data);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
385                              obddev->u.cli.cl_max_mds_easize);
386         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
387                              obddev->u.cli.cl_max_mds_cookiesize);
388         ptlrpc_request_set_replen(req);
389         RETURN(req);
390 }
391
392 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
393                                                       struct lookup_intent *it,
394                                                       struct md_op_data *op_data)
395 {
396         struct ptlrpc_request *req;
397         struct obd_device     *obddev = class_exp2obd(exp);
398         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
399                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
400                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
401                                        (client_is_remote(exp) ?
402                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_GETATTR);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
413         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414                              op_data->op_namelen + 1);
415
416         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(ERR_PTR(rc));
420         }
421
422         /* pack the intent */
423         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424         lit->opc = (__u64)it->it_op;
425
426         /* pack the intended request */
427         mdc_getattr_pack(req, valid, it->it_flags, op_data,
428                          obddev->u.cli.cl_max_mds_easize);
429
430         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
431                              obddev->u.cli.cl_max_mds_easize);
432         if (client_is_remote(exp))
433                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
434                                      sizeof(struct mdt_remote_perm));
435         ptlrpc_request_set_replen(req);
436         RETURN(req);
437 }
438
439 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
440 {
441         struct ptlrpc_request *req;
442         int rc;
443         ENTRY;
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
446         if (req == NULL)
447                 RETURN(ERR_PTR(-ENOMEM));
448
449         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 RETURN(ERR_PTR(rc));
453         }
454
455         ptlrpc_request_set_replen(req);
456         RETURN(req);
457 }
458
459 static int mdc_finish_enqueue(struct obd_export *exp,
460                               struct ptlrpc_request *req,
461                               struct ldlm_enqueue_info *einfo,
462                               struct lookup_intent *it,
463                               struct lustre_handle *lockh,
464                               int rc)
465 {
466         struct req_capsule  *pill = &req->rq_pill;
467         struct ldlm_request *lockreq;
468         struct ldlm_reply   *lockrep;
469         ENTRY;
470
471         LASSERT(rc >= 0);
472         /* Similarly, if we're going to replay this request, we don't want to
473          * actually get a lock, just perform the intent. */
474         if (req->rq_transno || req->rq_replay) {
475                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
476                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
477         }
478
479         if (rc == ELDLM_LOCK_ABORTED) {
480                 einfo->ei_mode = 0;
481                 memset(lockh, 0, sizeof(*lockh));
482                 rc = 0;
483         } else { /* rc = 0 */
484                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
485                 LASSERT(lock);
486
487                 /* If the server gave us back a different lock mode, we should
488                  * fix up our variables. */
489                 if (lock->l_req_mode != einfo->ei_mode) {
490                         ldlm_lock_addref(lockh, lock->l_req_mode);
491                         ldlm_lock_decref(lockh, einfo->ei_mode);
492                         einfo->ei_mode = lock->l_req_mode;
493                 }
494                 LDLM_LOCK_PUT(lock);
495         }
496
497         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
498         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
499
500         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
501         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
502         it->d.lustre.it_lock_mode = einfo->ei_mode;
503         it->d.lustre.it_lock_handle = lockh->cookie;
504         it->d.lustre.it_data = req;
505
506         if (it->d.lustre.it_status < 0 && req->rq_replay)
507                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
508
509         /* If we're doing an IT_OPEN which did not result in an actual
510          * successful open, then we need to remove the bit which saves
511          * this request for unconditional replay.
512          *
513          * It's important that we do this first!  Otherwise we might exit the
514          * function without doing so, and try to replay a failed create
515          * (bug 3440) */
516         if (it->it_op & IT_OPEN && req->rq_replay &&
517             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
518                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
519
520         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
521                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
522
523         /* We know what to expect, so we do any byte flipping required here */
524         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
525                 struct mdt_body *body;
526
527                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
528                 if (body == NULL) {
529                         CERROR ("Can't swab mdt_body\n");
530                         RETURN (-EPROTO);
531                 }
532
533                 if (it_disposition(it, DISP_OPEN_OPEN) &&
534                     !it_open_error(DISP_OPEN_OPEN, it)) {
535                         /*
536                          * If this is a successful OPEN request, we need to set
537                          * replay handler and data early, so that if replay
538                          * happens immediately after swabbing below, new reply
539                          * is swabbed by that handler correctly.
540                          */
541                         mdc_set_open_replay_data(NULL, NULL, req);
542                 }
543
544                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
545                         void *eadata;
546
547                          mdc_update_max_ea_from_body(exp, body);
548
549                         /*
550                          * The eadata is opaque; just check that it is there.
551                          * Eventually, obd_unpackmd() will check the contents.
552                          */
553                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
554                                                               body->eadatasize);
555                         if (eadata == NULL)
556                                 RETURN(-EPROTO);
557
558                         /*
559                          * We save the reply LOV EA in case we have to replay a
560                          * create for recovery.  If we didn't allocate a large
561                          * enough request buffer above we need to reallocate it
562                          * here to hold the actual LOV EA.
563                          *
564                          * To not save LOV EA if request is not going to replay
565                          * (for example error one).
566                          */
567                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
568                                 void *lmm;
569                                 if (req_capsule_get_size(pill, &RMF_EADATA,
570                                                          RCL_CLIENT) <
571                                     body->eadatasize)
572                                         mdc_realloc_openmsg(req, body);
573                                 else
574                                         req_capsule_shrink(pill, &RMF_EADATA,
575                                                            body->eadatasize,
576                                                            RCL_CLIENT);
577
578                                 req_capsule_set_size(pill, &RMF_EADATA,
579                                                      RCL_CLIENT,
580                                                      body->eadatasize);
581
582                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
583                                 if (lmm)
584                                         memcpy(lmm, eadata, body->eadatasize);
585                         }
586                 }
587
588                 if (body->valid & OBD_MD_FLRMTPERM) {
589                         struct mdt_remote_perm *perm;
590
591                         LASSERT(client_is_remote(exp));
592                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
593                                                 lustre_swab_mdt_remote_perm);
594                         if (perm == NULL)
595                                 RETURN(-EPROTO);
596                 }
597                 if (body->valid & OBD_MD_FLMDSCAPA) {
598                         struct lustre_capa *capa, *p;
599
600                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
601                         if (capa == NULL)
602                                 RETURN(-EPROTO);
603
604                         if (it->it_op & IT_OPEN) {
605                                 /* client fid capa will be checked in replay */
606                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
607                                 LASSERT(p);
608                                 *p = *capa;
609                         }
610                 }
611                 if (body->valid & OBD_MD_FLOSSCAPA) {
612                         struct lustre_capa *capa;
613
614                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
615                         if (capa == NULL)
616                                 RETURN(-EPROTO);
617                 }
618         }
619
620         RETURN(rc);
621 }
622
623 /* We always reserve enough space in the reply packet for a stripe MD, because
624  * we don't know in advance the file type. */
625 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
626                 struct lookup_intent *it, struct md_op_data *op_data,
627                 struct lustre_handle *lockh, void *lmm, int lmmsize,
628                 struct ptlrpc_request **reqp, int extra_lock_flags)
629 {
630         struct obd_device     *obddev = class_exp2obd(exp);
631         struct ptlrpc_request *req = NULL;
632         struct req_capsule    *pill;
633         int                    flags = extra_lock_flags;
634         int                    rc;
635         struct ldlm_res_id res_id;
636         static const ldlm_policy_data_t lookup_policy =
637                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
638         static const ldlm_policy_data_t update_policy =
639                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
640         ldlm_policy_data_t const *policy = &lookup_policy;
641         ENTRY;
642
643         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
644                  einfo->ei_type);
645
646         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
647
648         if (it)
649                 flags |= LDLM_FL_HAS_INTENT;
650         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
651                 policy = &update_policy;
652
653         if (reqp)
654                 req = *reqp;
655
656         if (!it) {
657                 /* The only way right now is FLOCK, in this case we hide flock
658                    policy as lmm, but lmmsize is 0 */
659                 LASSERT(lmm && lmmsize == 0);
660                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
661                          einfo->ei_type);
662                 policy = (ldlm_policy_data_t *)lmm;
663                 res_id.name[3] = LDLM_FLOCK;
664         } else if (it->it_op & IT_OPEN) {
665                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
666                                            einfo->ei_cbdata);
667                 policy = &update_policy;
668                 einfo->ei_cbdata = NULL;
669                 lmm = NULL;
670         } else if (it->it_op & IT_UNLINK)
671                 req = mdc_intent_unlink_pack(exp, it, op_data);
672         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
673                 req = mdc_intent_getattr_pack(exp, it, op_data);
674         else if (it->it_op == IT_READDIR)
675                 req = ldlm_enqueue_pack(exp);
676         else {
677                 LBUG();
678                 RETURN(-EINVAL);
679         }
680
681         if (IS_ERR(req))
682                 RETURN(PTR_ERR(req));
683         pill = &req->rq_pill;
684
685         /* It is important to obtain rpc_lock first (if applicable), so that
686          * threads that are serialised with rpc_lock are not polluting our
687          * rpcs in flight counter. We do not do flock request limiting, though*/
688         if (it) {
689                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
690                 mdc_enter_request(&obddev->u.cli);
691         }
692
693         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
694                               0, lockh, 0);
695         if (reqp)
696                 *reqp = req;
697
698         if (it) {
699                 mdc_exit_request(&obddev->u.cli);
700                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
701         }
702         if (!it) {
703                 /* For flock requests we immediatelly return without further
704                    delay and let caller deal with the rest, since rest of
705                    this function metadata processing makes no sense for flock
706                    requests anyway */
707                 RETURN(rc);
708         }
709
710         if (rc < 0) {
711                 CERROR("ldlm_cli_enqueue: %d\n", rc);
712                 mdc_clear_replay_flag(req, rc);
713                 ptlrpc_req_finished(req);
714                 RETURN(rc);
715         }
716         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
717
718         RETURN(rc);
719 }
720
721 static int mdc_finish_intent_lock(struct obd_export *exp,
722                                   struct ptlrpc_request *request,
723                                   struct md_op_data *op_data,
724                                   struct lookup_intent *it,
725                                   struct lustre_handle *lockh)
726 {
727         struct lustre_handle old_lock;
728         struct mdt_body *mdt_body;
729         struct ldlm_lock *lock;
730         int rc;
731
732
733         LASSERT(request != NULL);
734         LASSERT(request != LP_POISON);
735         LASSERT(request->rq_repmsg != LP_POISON);
736
737         if (!it_disposition(it, DISP_IT_EXECD)) {
738                 /* The server failed before it even started executing the
739                  * intent, i.e. because it couldn't unpack the request. */
740                 LASSERT(it->d.lustre.it_status != 0);
741                 RETURN(it->d.lustre.it_status);
742         }
743         rc = it_open_error(DISP_IT_EXECD, it);
744         if (rc)
745                 RETURN(rc);
746
747         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
748         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
749
750         /* If we were revalidating a fid/name pair, mark the intent in
751          * case we fail and get called again from lookup */
752         if (fid_is_sane(&op_data->op_fid2) &&
753             it->it_create_mode & M_CHECK_STALE &&
754             it->it_op != IT_GETATTR) {
755                 it_set_disposition(it, DISP_ENQ_COMPLETE);
756
757                 /* Also: did we find the same inode? */
758                 /* sever can return one of two fids:
759                  * op_fid2 - new allocated fid - if file is created.
760                  * op_fid3 - existent fid - if file only open.
761                  * op_fid3 is saved in lmv_intent_open */
762                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
763                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
764                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
765                                "\n", PFID(&op_data->op_fid2),
766                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
767                         RETURN(-ESTALE);
768                 }
769         }
770
771         rc = it_open_error(DISP_LOOKUP_EXECD, it);
772         if (rc)
773                 RETURN(rc);
774
775         /* keep requests around for the multiple phases of the call
776          * this shows the DISP_XX must guarantee we make it into the call
777          */
778         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
779             it_disposition(it, DISP_OPEN_CREATE) &&
780             !it_open_error(DISP_OPEN_CREATE, it)) {
781                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
782                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
783         }
784         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
785             it_disposition(it, DISP_OPEN_OPEN) &&
786             !it_open_error(DISP_OPEN_OPEN, it)) {
787                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
788                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
789                 /* BUG 11546 - eviction in the middle of open rpc processing */
790                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
791         }
792
793         if (it->it_op & IT_CREAT) {
794                 /* XXX this belongs in ll_create_it */
795         } else if (it->it_op == IT_OPEN) {
796                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
797         } else {
798                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
799         }
800
801         /* If we already have a matching lock, then cancel the new
802          * one.  We have to set the data here instead of in
803          * mdc_enqueue, because we need to use the child's inode as
804          * the l_ast_data to match, and that's not available until
805          * intent_finish has performed the iget().) */
806         lock = ldlm_handle2lock(lockh);
807         if (lock) {
808                 ldlm_policy_data_t policy = lock->l_policy_data;
809                 LDLM_DEBUG(lock, "matching against this");
810
811                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
812                                          &lock->l_resource->lr_name),
813                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
814                          (unsigned long)lock->l_resource->lr_name.name[0],
815                          (unsigned long)lock->l_resource->lr_name.name[1],
816                          (unsigned long)lock->l_resource->lr_name.name[2],
817                          (unsigned long)fid_seq(&mdt_body->fid1),
818                          (unsigned long)fid_oid(&mdt_body->fid1),
819                          (unsigned long)fid_ver(&mdt_body->fid1));
820                 LDLM_LOCK_PUT(lock);
821
822                 memcpy(&old_lock, lockh, sizeof(*lockh));
823                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
824                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
825                         ldlm_lock_decref_and_cancel(lockh,
826                                                     it->d.lustre.it_lock_mode);
827                         memcpy(lockh, &old_lock, sizeof(old_lock));
828                         it->d.lustre.it_lock_handle = lockh->cookie;
829                 }
830         }
831         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
832                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
833                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
834         RETURN(rc);
835 }
836
837 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
838                         struct lu_fid *fid)
839 {
840         /* We could just return 1 immediately, but since we should only
841          * be called in revalidate_it if we already have a lock, let's
842          * verify that. */
843         struct ldlm_res_id res_id;
844         struct lustre_handle lockh;
845         ldlm_policy_data_t policy;
846         ldlm_mode_t mode;
847         ENTRY;
848
849         fid_build_reg_res_name(fid, &res_id);
850         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
851                                   MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
852
853         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
854                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
855                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
856         if (mode) {
857                 it->d.lustre.it_lock_handle = lockh.cookie;
858                 it->d.lustre.it_lock_mode = mode;
859         }
860
861         RETURN(!!mode);
862 }
863
864 /*
865  * This long block is all about fixing up the lock and request state
866  * so that it is correct as of the moment _before_ the operation was
867  * applied; that way, the VFS will think that everything is normal and
868  * call Lustre's regular VFS methods.
869  *
870  * If we're performing a creation, that means that unless the creation
871  * failed with EEXIST, we should fake up a negative dentry.
872  *
873  * For everything else, we want to lookup to succeed.
874  *
875  * One additional note: if CREATE or OPEN succeeded, we add an extra
876  * reference to the request because we need to keep it around until
877  * ll_create/ll_open gets called.
878  *
879  * The server will return to us, in it_disposition, an indication of
880  * exactly what d.lustre.it_status refers to.
881  *
882  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
883  * otherwise if DISP_OPEN_CREATE is set, then it status is the
884  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
885  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
886  * was successful.
887  *
888  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
889  * child lookup.
890  */
891 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
892                     void *lmm, int lmmsize, struct lookup_intent *it,
893                     int lookup_flags, struct ptlrpc_request **reqp,
894                     ldlm_blocking_callback cb_blocking,
895                     int extra_lock_flags)
896 {
897         struct lustre_handle lockh;
898         int rc = 0;
899         ENTRY;
900         LASSERT(it);
901
902         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
903                ", intent: %s flags %#o\n", op_data->op_namelen,
904                op_data->op_name, PFID(&op_data->op_fid2),
905                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
906                it->it_flags);
907
908         lockh.cookie = 0;
909         if (fid_is_sane(&op_data->op_fid2) &&
910             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
911                 /* We could just return 1 immediately, but since we should only
912                  * be called in revalidate_it if we already have a lock, let's
913                  * verify that. */
914                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2);
915                 /* Only return failure if it was not GETATTR by cfid
916                    (from inode_revalidate) */
917                 if (rc || op_data->op_namelen != 0)
918                         RETURN(rc);
919         }
920
921         /* lookup_it may be called only after revalidate_it has run, because
922          * revalidate_it cannot return errors, only zero.  Returning zero causes
923          * this call to lookup, which *can* return an error.
924          *
925          * We only want to execute the request associated with the intent one
926          * time, however, so don't send the request again.  Instead, skip past
927          * this and use the request from revalidate.  In this case, revalidate
928          * never dropped its reference, so the refcounts are all OK */
929         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
930                 struct ldlm_enqueue_info einfo =
931                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
932                           ldlm_completion_ast, NULL, NULL, NULL };
933
934                 /* For case if upper layer did not alloc fid, do it now. */
935                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
936                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
937                         if (rc < 0) {
938                                 CERROR("Can't alloc new fid, rc %d\n", rc);
939                                 RETURN(rc);
940                         }
941                 }
942                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
943                                  lmm, lmmsize, NULL, extra_lock_flags);
944                 if (rc < 0)
945                         RETURN(rc);
946         } else if (!fid_is_sane(&op_data->op_fid2) ||
947                    !(it->it_create_mode & M_CHECK_STALE)) {
948                 /* DISP_ENQ_COMPLETE set means there is extra reference on
949                  * request referenced from this intent, saved for subsequent
950                  * lookup.  This path is executed when we proceed to this
951                  * lookup, so we clear DISP_ENQ_COMPLETE */
952                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
953         }
954         *reqp = it->d.lustre.it_data;
955         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
956         RETURN(rc);
957 }
958
959 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
960                                               struct ptlrpc_request *req,
961                                               void *unused, int rc)
962 {
963         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
964         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
965         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
966         struct lookup_intent     *it;
967         struct lustre_handle     *lockh;
968         struct obd_device        *obddev;
969         int                       flags = LDLM_FL_HAS_INTENT;
970         ENTRY;
971
972         it    = &minfo->mi_it;
973         lockh = &minfo->mi_lockh;
974
975         obddev = class_exp2obd(exp);
976
977         mdc_exit_request(&obddev->u.cli);
978         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
979                 rc = -ETIMEDOUT;
980
981         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
982                                    &flags, NULL, 0, lockh, rc);
983         if (rc < 0) {
984                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
985                 mdc_clear_replay_flag(req, rc);
986                 GOTO(out, rc);
987         }
988
989         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
990         if (rc)
991                 GOTO(out, rc);
992
993         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
994         EXIT;
995
996 out:
997         OBD_FREE_PTR(einfo);
998         minfo->mi_cb(req, minfo, rc);
999         return 0;
1000 }
1001
1002 int mdc_intent_getattr_async(struct obd_export *exp,
1003                              struct md_enqueue_info *minfo,
1004                              struct ldlm_enqueue_info *einfo)
1005 {
1006         struct md_op_data       *op_data = &minfo->mi_data;
1007         struct lookup_intent    *it = &minfo->mi_it;
1008         struct ptlrpc_request   *req;
1009         struct obd_device       *obddev = class_exp2obd(exp);
1010         struct ldlm_res_id       res_id;
1011         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1012          *     for statahead currently. Consider CMD in future, such two bits
1013          *     maybe managed by different MDS, should be adjusted then. */
1014         ldlm_policy_data_t       policy = {
1015                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1016                                                          MDS_INODELOCK_UPDATE }
1017                                  };
1018         int                      rc;
1019         int                      flags = LDLM_FL_HAS_INTENT;
1020         ENTRY;
1021
1022         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1023                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1024                ldlm_it2str(it->it_op), it->it_flags);
1025
1026         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1027         req = mdc_intent_getattr_pack(exp, it, op_data);
1028         if (!req)
1029                 RETURN(-ENOMEM);
1030
1031         mdc_enter_request(&obddev->u.cli);
1032         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1033                               0, &minfo->mi_lockh, 1);
1034         if (rc < 0) {
1035                 mdc_exit_request(&obddev->u.cli);
1036                 RETURN(rc);
1037         }
1038
1039         req->rq_async_args.pointer_arg[0] = exp;
1040         req->rq_async_args.pointer_arg[1] = minfo;
1041         req->rq_async_args.pointer_arg[2] = einfo;
1042         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1043         ptlrpcd_add_req(req, PSCOPE_OTHER);
1044
1045         RETURN(0);
1046 }