Whamcloud - gitweb
b475a24712e021bff9faa7d1c66e9194b297f2fb
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <obd_support.h>
47 /* struct ptlrpc_request */
48 #include <lustre_net.h>
49 /* struct obd_export */
50 #include <lustre_export.h>
51 /* struct obd_device */
52 #include <obd.h>
53 /* lu2dt_dev() */
54 #include <dt_object.h>
55 /* struct mds_client_data */
56 #include "../mds/mds_internal.h"
57 #include "mdt_internal.h"
58
59 /*
60  * Initialized in mdt_mod_init().
61  */
62 unsigned long mdt_num_threads;
63
64 struct mdt_handler {
65         const char *mh_name;
66         int         mh_fail_id;
67         __u32       mh_opc;
68         __u32       mh_flags;
69         int (*mh_act)(struct mdt_thread_info *info);
70
71         const struct req_format *mh_fmt;
72 };
73
74 enum mdt_handler_flags {
75         /*
76          * struct mdt_body is passed in the incoming message, and object
77          * identified by this fid exists on disk.
78          */
79         HABEO_CORPUS = (1 << 0),
80         /*
81          * struct ldlm_request is passed in the incoming message.
82          */
83         HABEO_CLAVIS = (1 << 1),
84         /*
85          * this request has fixed reply format, so that reply message can be
86          * packed by generic code.
87          */
88         HABEO_REFERO = (1 << 2)
89 };
90
91 struct mdt_opc_slice {
92         __u32               mos_opc_start;
93         int                 mos_opc_end;
94         struct mdt_handler *mos_hs;
95 };
96
97 static struct mdt_opc_slice mdt_handlers[];
98
99 static int                    mdt_handle    (struct ptlrpc_request *req);
100 static struct mdt_device     *mdt_dev       (struct lu_device *d);
101 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
102
103 static struct lu_context_key       mdt_thread_key;
104 static struct lu_object_operations mdt_obj_ops;
105
106
107 static int mdt_getstatus(struct mdt_thread_info *info)
108 {
109         struct md_device *next  = info->mti_mdt->mdt_child;
110         int               result;
111         struct mdt_body  *body;
112
113         ENTRY;
114
115         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
116                 result = -ENOMEM;
117         else {
118                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
119                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
120                                                     next, &body->fid1);
121                 if (result == 0)
122                         body->valid |= OBD_MD_FLID;
123         }
124
125         /* the last_committed and last_xid fields are filled in for all
126          * replies already - no need to do so here also.
127          */
128         RETURN(result);
129 }
130
131 static int mdt_statfs(struct mdt_thread_info *info)
132 {
133         struct md_device  *next  = info->mti_mdt->mdt_child;
134         struct obd_statfs *osfs;
135         int                result;
136
137         ENTRY;
138
139         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
140                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
141                 result = -ENOMEM;
142         } else {
143                 osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
144                 /* XXX max_age optimisation is needed here. See mds_statfs */
145                 result = next->md_ops->mdo_statfs(info->mti_ctxt,
146                                                   next, &info->mti_sfs);
147                 statfs_pack(osfs, &info->mti_sfs);
148         }
149
150         RETURN(result);
151 }
152
153 void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
154 {
155         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
156                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
157                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
158
159         if (!S_ISREG(attr->la_mode))
160                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
161                             OBD_MD_FLMTIME;
162
163         b->atime      = attr->la_atime;
164         b->mtime      = attr->la_mtime;
165         b->ctime      = attr->la_ctime;
166         b->mode       = attr->la_mode;
167         b->size       = attr->la_size;
168         b->blocks     = attr->la_blocks;
169         b->uid        = attr->la_uid;
170         b->gid        = attr->la_gid;
171         b->flags      = attr->la_flags;
172         b->nlink      = attr->la_nlink;
173 }
174
175 static int mdt_getattr_pack_msg(struct mdt_thread_info *info)
176 {
177 #ifdef MDT_CODE
178         const struct mdt_body *body = info->mti_body;
179         struct req_capsule *pill = &info->mti_pill;
180 #endif
181         struct md_object *next = mdt_object_child(info->mti_object);
182         struct lu_attr *la = &info->mti_attr;
183         int rc;
184         ENTRY;
185
186         rc = mo_attr_get(info->mti_ctxt, next, la);
187         if (rc){
188                 RETURN(rc);
189         }
190 #ifdef MDT_CODE
191         if ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
192             (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA))) {
193                 rc = mo_xattr_get(info->mti_ctxt, next, NULL, 0, "lov");
194
195                 CDEBUG(D_INODE, "got %d bytes MD data for object "DFID3"\n",
196                        rc, PFID3(mdt_object_fid(info->mti_object)));
197                 if (rc < 0) {
198                         if (rc != -ENODATA) {
199                                 CERROR("error getting MD "DFID3": rc = %d\n",
200                                        PFID3(mdt_object_fid(info->mti_object)),
201                                        rc);
202                                 RETURN(rc);
203                         }
204                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
205                 } else if (rc > MAX_MD_SIZE) {
206                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
207                         CERROR("MD size %d larger than maximum possible %u\n",
208                                rc, MAX_MD_SIZE);
209                 } else {
210                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
211                 }
212         } else if (S_ISLNK(la->la_mode) && (body->valid & OBD_MD_LINKNAME)) {
213                 /* It also uese the mdt_md to hold symname */
214                 int len = min_t(int, la->la_size + 1, body->eadatasize);
215                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, len);
216         }
217
218 #ifdef CONFIG_FS_POSIX_ACL
219         if ((mdt_info_req(info)->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
220             (body->valid & OBD_MD_FLACL)) {
221
222                 rc = mo_xattr_get(info->mti_ctxt, next,
223                                   NULL, 0, XATTR_NAME_ACL_ACCESS);
224                 if (rc < 0) {
225                         if (rc != -ENODATA) {
226                                 CERROR("got acl size: %d\n", rc);
227                                 RETURN(rc);
228                         }
229                         req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, 0);
230                 } else
231                         req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, rc);
232         }
233 #endif
234 #endif
235         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
236                 CERROR("failed MDT_GETATTR_PACK test\n");
237                 RETURN(-ENOMEM);
238         }
239         rc = req_capsule_pack(&info->mti_pill);
240         if (rc) {
241                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
242                 RETURN(rc);
243         }
244
245         RETURN(0);
246 }
247
248 static int mdt_getattr_internal(struct mdt_thread_info *info)
249 {
250         struct md_object *next = mdt_object_child(info->mti_object);
251         const struct mdt_body  *reqbody = info->mti_body;
252         struct mdt_body  *repbody;
253         struct lu_attr *la = &info->mti_attr;
254         int rc;
255 #ifdef MDT_CODE
256         void *buffer;
257         int length;
258 #endif
259         ENTRY;
260
261         rc = mo_attr_get(info->mti_ctxt, next, la);
262         if (rc){
263                 CERROR("getattr error for "DFID3": %d\n",
264                         PFID3(&reqbody->fid1), rc);
265                 RETURN(rc);
266         }
267
268         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
269         mdt_pack_attr2body(repbody, la);
270         repbody->fid1 = *mdt_object_fid(info->mti_object);
271         repbody->valid |= OBD_MD_FLID;
272
273 #ifdef MDT_CODE
274         buffer = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
275         length = req_capsule_get_size(&info->mti_pill, &RMF_MDT_MD,
276                                       RCL_SERVER);
277
278         if ((S_ISREG(la->la_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
279             (S_ISDIR(la->la_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
280                 rc = mo_xattr_get(info->mti_ctxt, next,
281                                   buffer, length, "lov");
282                 if (rc < 0)
283                         RETURN(rc);
284
285                 if (S_ISDIR(la->la_mode))
286                         repbody->valid |= OBD_MD_FLDIREA;
287                 else
288                         repbody->valid |= OBD_MD_FLEASIZE;
289                 repbody->eadatasize = rc;
290         } else if (S_ISLNK(la->la_mode) &&
291                           (reqbody->valid & OBD_MD_LINKNAME) != 0) {
292                 /* FIXME How to readlink??
293                 rc = mo_xattr_get(info->mti_ctxt, next,
294                                   buffer, length, "readlink");
295                 */ rc = 10;
296                 if (rc < 0) {
297                         CERROR("readlink failed: %d\n", rc);
298                         RETURN(rc);
299                 } else {
300                         repbody->valid |= OBD_MD_LINKNAME;
301                         repbody->eadatasize = rc + 1;
302                         ((char*)buffer)[rc] = 0;        /* NULL terminate */
303                         CDEBUG(D_INODE, "read symlink dest %s\n", (char*)buffer);
304                 }
305         }
306
307         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
308                 repbody->max_cookiesize = MAX_MD_SIZE; /*FIXME*/
309                 repbody->max_mdsize = MAX_MD_SIZE;
310                 repbody->valid |= OBD_MD_FLMODEASIZE;
311         }
312
313
314 #ifdef CONFIG_FS_POSIX_ACL
315         if ((mdt_info_req(info)->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
316             (reqbody->valid & OBD_MD_FLACL)) {
317                 buffer = req_capsule_server_get(&info->mti_pill,
318                                                 &RMF_EADATA);
319                 length = req_capsule_get_size(&info->mti_pill,
320                                               &RMF_EADATA,
321                                               RCL_SERVER);
322                 rc = mo_xattr_get(info->mti_ctxt, next,
323                                   buffer, length, XATTR_NAME_ACL_ACCESS);
324
325                 if (rc < 0) {
326                         if (rc != -ENODATA) {
327                                 CERROR("got acl size: %d\n", rc);
328                                 RETURN(rc);
329                         }
330                         rc = 0;
331                 }
332                 repbody->aclsize = rc;
333                 repbody->valid |= OBD_MD_FLACL;
334         }
335 #endif
336 #endif
337         RETURN(0);
338 }
339
340 static int mdt_getattr(struct mdt_thread_info *info)
341 {
342         int result;
343
344         LASSERT(info->mti_object != NULL);
345         LASSERT(lu_object_assert_exists(info->mti_ctxt,
346                                         &info->mti_object->mot_obj.mo_lu));
347         ENTRY;
348
349         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
350                 CERROR(LUSTRE_MDT0_NAME": getattr lustre_pack_reply failed\n");
351                 result = -ENOMEM;
352         } else {
353                 result = mdt_getattr_pack_msg(info);
354                 if (result == 0)
355                         result = mdt_getattr_internal(info);
356         }
357         RETURN(result);
358 }
359
360 /* @ Huang Hua
361  * UPDATE lock should be taken against parent, and be release before exit;
362  * child_bits lock should be taken against child, and be returned back:
363  *            (1)normal request should release the child lock;
364  *            (2)intent request will grant the lock to client.
365  */
366 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
367                                  struct mdt_lock_handle *lhc,
368                                  __u64 child_bits)
369 {
370         struct mdt_object *parent = info->mti_object;
371         struct mdt_object *child;
372         struct md_object  *next = mdt_object_child(info->mti_object);
373         const char *name;
374         int result;
375         struct mdt_lock_handle *lhp;
376         struct lu_fid child_fid;
377         struct ldlm_namespace *ns;
378
379         ENTRY;
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         lhp->mlh_mode = LCK_CR;
383         lhc->mlh_mode = LCK_CR;
384
385         LASSERT(info->mti_object != NULL);
386
387         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
388         if (name == NULL)
389                 RETURN(-EFAULT);
390
391         ns = info->mti_mdt->mdt_namespace;
392         /*step 1: lock parent */
393         result = mdt_object_lock(ns, parent, lhp, MDS_INODELOCK_UPDATE);
394         if (result != 0)
395                 RETURN(result);
396
397         /*step 2: lookup child's fid by name */
398         result = mdo_lookup(info->mti_ctxt, next, name, &child_fid);
399         if (result != 0)
400                 GOTO(out_parent, result);
401
402         /*step 3: find the child object by fid */
403         child = mdt_object_find(info->mti_ctxt, info->mti_mdt, &child_fid);
404         if (IS_ERR(child))
405                 GOTO(out_parent, result = PTR_ERR(child));
406
407         /*step 4: lock child: this lock is returned back to caller
408          *                    if successfully get attr.
409          */
410         result = mdt_object_lock(ns, child, lhc, child_bits);
411         if (result != 0)
412                 GOTO(out_child, result);
413
414         /* finally, we can get attr for child. */
415         result = mdt_getattr_pack_msg(info);
416         if (result == 0) {
417                 struct ldlm_reply *ldlm_rep;
418                 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
419                 LASSERT(ldlm_rep);
420                 intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
421                 result = mdt_getattr_internal(info);
422                 if (result)
423                         intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
424                 else
425                         intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
426         }
427         if (result != 0)
428                 mdt_object_unlock(ns, child, lhc);
429
430 out_child:
431         mdt_object_put(info->mti_ctxt, child);
432 out_parent:
433         mdt_object_unlock(ns, parent, lhp);
434         RETURN(result);
435 }
436
437 /* normal handler: should release the child lock */
438 static int mdt_getattr_name(struct mdt_thread_info *info)
439 {
440         struct mdt_lock_handle lhc = {{0}};
441         int rc;
442
443         ENTRY;
444
445         rc = mdt_getattr_name_lock(info, &lhc, MDS_INODELOCK_UPDATE);
446         if (rc == 0 && lustre_handle_is_used(&lhc.mlh_lh))
447                 ldlm_lock_decref(&lhc.mlh_lh, lhc.mlh_mode);
448         RETURN(rc);
449 }
450
451 static struct lu_device_operations mdt_lu_ops;
452
453 static int lu_device_is_mdt(struct lu_device *d)
454 {
455         /*
456          * XXX for now. Tags in lu_device_type->ldt_something are needed.
457          */
458         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
459 }
460
461 static struct mdt_device *mdt_dev(struct lu_device *d)
462 {
463         LASSERT(lu_device_is_mdt(d));
464         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
465 }
466
467 static int mdt_connect(struct mdt_thread_info *info)
468 {
469         int result;
470         struct req_capsule *pill;
471         struct ptlrpc_request *req;
472
473         pill = &info->mti_pill;
474         req = mdt_info_req(info);
475         result = target_handle_connect(req, mdt_handle);
476         if (result == 0) {
477                 LASSERT(req->rq_export != NULL);
478                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
479         }
480         return result;
481 }
482
483 static int mdt_disconnect(struct mdt_thread_info *info)
484 {
485         return target_handle_disconnect(mdt_info_req(info));
486 }
487
488 static int mdt_readpage(struct mdt_thread_info *info)
489 {
490         return -EOPNOTSUPP;
491 }
492
493 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
494 {
495         int rc;
496         ENTRY;
497
498         OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_UNPACK, -EFAULT);
499
500         rc = mdt_reint_unpack(info, op);
501         if (rc == 0) {
502                 struct mdt_reint_reply *rep;
503
504                 rep = &info->mti_reint_rep;
505                 rep->mrr_body = req_capsule_server_get(&info->mti_pill,
506                                                        &RMF_MDT_BODY);
507                 if (rep->mrr_body != NULL)
508                         /*
509                          * XXX fill other fields in @rep with pointers to
510                          * reply buffers.
511                          */
512                         rc = mdt_reint_rec(info);
513                 else
514                         rc = -EFAULT;
515         }
516
517         RETURN(rc);
518 }
519
520 static long mdt_reint_opcode(struct mdt_thread_info *info,
521                              const struct req_format **fmt)
522 {
523         __u32 *ptr;
524         long opc;
525
526         opc = -EINVAL;
527         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
528         if (ptr != NULL) {
529                 opc = *ptr;
530                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
531                 if (opc < REINT_MAX && fmt[opc] != NULL)
532                         req_capsule_extend(&info->mti_pill, fmt[opc]);
533                 else
534                         CERROR("Unsupported opc: %ld\n", opc);
535         }
536         return opc;
537 }
538
539 static int mdt_reint(struct mdt_thread_info *info)
540 {
541         long opc;
542         int  rc;
543
544         struct ptlrpc_request *req;
545
546         static const struct req_format *reint_fmts[REINT_MAX] = {
547                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
548                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
549                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
550                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
551                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
552                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
553         };
554
555         ENTRY;
556
557         req = mdt_info_req(info);
558         opc = mdt_reint_opcode(info, reint_fmts);
559         if (opc >= 0) {
560                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
561
562                 rc = req_capsule_pack(&info->mti_pill);
563                 if (rc == 0)
564                         rc = mdt_reint_internal(info, opc);
565         } else
566                 rc = opc;
567         RETURN(rc);
568 }
569
570 static int mdt_close(struct mdt_thread_info *info)
571 {
572         return -EOPNOTSUPP;
573 }
574
575 static int mdt_done_writing(struct mdt_thread_info *info)
576 {
577         return -EOPNOTSUPP;
578 }
579
580 static int mdt_pin(struct mdt_thread_info *info)
581 {
582         return -EOPNOTSUPP;
583 }
584
585 #ifdef MDT_CODE
586 static int mdt_device_sync(struct mdt_device *mdt)
587 {
588         return 0;
589 }
590
591 static int mdt_object_sync(struct mdt_object *m)
592 {
593         return 0;
594 }
595
596 static int mdt_sync(struct mdt_thread_info *info)
597 {
598         struct mdt_body *body;
599         struct req_capsule *pill = &info->mti_pill;
600         int rc;
601         ENTRY;
602
603         req_capsule_set(pill, &RQF_MDS_SYNC);
604
605         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
606         if (body == NULL)
607                 RETURN(-EINVAL);
608
609         if (fid_seq(&body->fid1) == 0) {
610                 /* sync the whole device */
611                 rc = req_capsule_pack(pill);
612                 if (rc == 0)
613                         rc = mdt_device_sync(info->mti_mdt);
614         } else {
615                 /* sync an object */
616                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
617                 if (rc != 0)
618                         RETURN(rc);
619
620                 rc = mdt_object_sync(info->mti_object);
621                 if (rc != 0)
622                         RETURN(rc);
623
624                 rc = mo_attr_get(info->mti_ctxt,
625                                  mdt_object_child(info->mti_object),
626                                  &info->mti_attr);
627                 if (rc != 0)
628                         RETURN(rc);
629
630                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
631                 mdt_pack_attr2body(body, &info->mti_attr);
632                 body->fid1 = *mdt_object_fid(info->mti_object);
633                 body->valid |= OBD_MD_FLID;
634         }
635         RETURN(rc);
636 }
637 #else
638 static int mdt_sync(struct mdt_thread_info *info)
639 {
640         return -EOPNOTSUPP;
641 }
642 #endif
643
644
645 static int mdt_handle_quotacheck(struct mdt_thread_info *info)
646 {
647         return -EOPNOTSUPP;
648 }
649
650 static int mdt_handle_quotactl(struct mdt_thread_info *info)
651 {
652         return -EOPNOTSUPP;
653 }
654
655 /*
656  * OBD PING and other handlers.
657  */
658
659 static int mdt_obd_ping(struct mdt_thread_info *info)
660 {
661         int result;
662         ENTRY;
663         result = target_handle_ping(mdt_info_req(info));
664         RETURN(result);
665 }
666
667 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
668 {
669         return -EOPNOTSUPP;
670 }
671
672 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
673 {
674         return -EOPNOTSUPP;
675 }
676
677
678 /*
679  * DLM handlers.
680  */
681
682 static struct ldlm_callback_suite cbs = {
683         .lcs_completion = ldlm_server_completion_ast,
684         .lcs_blocking   = ldlm_server_blocking_ast,
685         .lcs_glimpse    = NULL
686 };
687
688 static int mdt_enqueue(struct mdt_thread_info *info)
689 {
690         /*
691          * info->mti_dlm_req already contains swapped and (if necessary)
692          * converted dlm request.
693          */
694         LASSERT(info->mti_dlm_req != NULL);
695
696         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
697         return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
698                                     mdt_info_req(info),
699                                     info->mti_dlm_req, &cbs);
700 }
701
702 static int mdt_convert(struct mdt_thread_info *info)
703 {
704         LASSERT(info->mti_dlm_req);
705         return ldlm_handle_convert0(mdt_info_req(info), info->mti_dlm_req);
706 }
707
708 static int mdt_bl_callback(struct mdt_thread_info *info)
709 {
710         CERROR("bl callbacks should not happen on MDS\n");
711         LBUG();
712         return -EOPNOTSUPP;
713 }
714
715 static int mdt_cp_callback(struct mdt_thread_info *info)
716 {
717         CERROR("cp callbacks should not happen on MDS\n");
718         LBUG();
719         return -EOPNOTSUPP;
720 }
721
722 /*
723  * Build (DLM) resource name from fid.
724  */
725 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
726                                        struct ldlm_res_id *name)
727 {
728         memset(name, 0, sizeof *name);
729         name->name[0] = fid_seq(f);
730         name->name[1] = fid_oid(f);
731         name->name[2] = fid_ver(f);
732         return name;
733 }
734
735 /*
736  * Return true if resource is for object identified by fid.
737  */
738 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
739 {
740         return name->name[0] == fid_seq(f) &&
741                 name->name[1] == fid_oid(f) &&
742                 name->name[2] == fid_ver(f);
743 }
744
745 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
746 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
747              struct lustre_handle *lh, ldlm_mode_t mode,
748              ldlm_policy_data_t *policy)
749 {
750         struct ldlm_res_id res_id;
751         int flags = 0;
752         int rc;
753         ENTRY;
754
755         LASSERT(ns != NULL);
756         LASSERT(lh != NULL);
757         LASSERT(f != NULL);
758
759         /* FIXME: is that correct to have @flags=0 here? */
760         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
761                               LDLM_IBITS, policy, mode, &flags,
762                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
763                               NULL, NULL, 0, NULL, lh);
764         RETURN(rc == ELDLM_OK ? 0 : -EIO);
765 }
766
767 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
768                 struct lustre_handle *lh, ldlm_mode_t mode)
769 {
770         struct ldlm_lock *lock;
771         ENTRY;
772
773         /* FIXME: this is debug stuff, remove it later. */
774         lock = ldlm_handle2lock(lh);
775         if (!lock) {
776                 CERROR("invalid lock handle "LPX64, lh->cookie);
777                 LBUG();
778         }
779
780         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
781
782         ldlm_lock_decref(lh, mode);
783         EXIT;
784 }
785
786 static struct mdt_object *mdt_obj(struct lu_object *o)
787 {
788         LASSERT(lu_device_is_mdt(o->lo_dev));
789         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
790 }
791
792 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
793                                    struct mdt_device *d,
794                                    const struct lu_fid *f)
795 {
796         struct lu_object *o;
797
798         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
799         if (IS_ERR(o))
800                 return (struct mdt_object *)o;
801         else
802                 return mdt_obj(o);
803 }
804
805 void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *o)
806 {
807         lu_object_put(ctxt, &o->mot_obj.mo_lu);
808 }
809
810 const struct lu_fid *mdt_object_fid(struct mdt_object *o)
811 {
812         return lu_object_fid(&o->mot_obj.mo_lu);
813 }
814
815 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
816                     struct mdt_lock_handle *lh, __u64 ibits)
817 {
818         ldlm_policy_data_t p = {
819                 .l_inodebits = {
820                         .bits = ibits
821                 }
822         };
823         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
824         LASSERT(lh->mlh_mode != LCK_MINMODE);
825
826         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
827 }
828
829 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
830                               struct mdt_lock_handle *lh)
831 {
832         if (lustre_handle_is_used(&lh->mlh_lh)) {
833                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
834                 lh->mlh_lh.cookie = 0;
835         }
836 }
837
838 struct mdt_object *mdt_object_find_lock(const struct lu_context *ctxt,
839                                         struct mdt_device *d,
840                                         const struct lu_fid *f,
841                                         struct mdt_lock_handle *lh,
842                                         __u64 ibits)
843 {
844         struct mdt_object *o;
845
846         o = mdt_object_find(ctxt, d, f);
847         if (!IS_ERR(o)) {
848                 int result;
849
850                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
851                 if (result != 0) {
852                         mdt_object_put(ctxt, o);
853                         o = ERR_PTR(result);
854                 }
855         }
856         return o;
857 }
858
859
860 static struct mdt_handler *mdt_handler_find(__u32 opc)
861 {
862         struct mdt_opc_slice *s;
863         struct mdt_handler   *h;
864
865         h = NULL;
866         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
867                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
868                         h = s->mos_hs + (opc - s->mos_opc_start);
869                         if (h->mh_opc != 0)
870                                 LASSERT(h->mh_opc == opc);
871                         else
872                                 h = NULL; /* unsupported opc */
873                         break;
874                 }
875         }
876         return h;
877 }
878
879 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
880 {
881         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
882 }
883
884 static int mdt_lock_resname_compat(struct mdt_device *m,
885                                    struct ldlm_request *req)
886 {
887         /* XXX something... later. */
888         return 0;
889 }
890
891 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
892 {
893         /* XXX something... later. */
894         return 0;
895 }
896
897 /*
898  * Generic code handling requests that have struct mdt_body passed in:
899  *
900  *  - extract mdt_body from request and save it in @info, if present;
901  *
902  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
903  *  @info;
904  *
905  *  - if HABEO_CORPUS flag is set for this request type check whether object
906  *  actually exists on storage (lu_object_exists()).
907  *
908  */
909 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
910 {
911         int result;
912         const struct mdt_body    *body;
913         struct mdt_object        *obj;
914         const struct lu_context  *ctx;
915         const struct req_capsule *pill;
916
917         ctx = info->mti_ctxt;
918         pill = &info->mti_pill;
919
920         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
921         if (body != NULL) {
922                 if (fid_is_sane(&body->fid1)) {
923                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
924                         if (!IS_ERR(obj)) {
925                                 if ((flags & HABEO_CORPUS) &&
926                                     !lu_object_exists(ctx,
927                                                       &obj->mot_obj.mo_lu)) {
928                                         mdt_object_put(ctx, obj);
929                                         result = -ENOENT;
930                                 } else {
931                                         info->mti_object = obj;
932                                         result = 0;
933                                 }
934                         } else
935                                 result = PTR_ERR(obj);
936                 } else {
937                         CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
938                         result = -EINVAL;
939                 }
940         } else
941                 result = -EFAULT;
942         return result;
943 }
944
945 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
946 {
947         struct req_capsule *pill;
948         int result;
949
950         ENTRY;
951         pill = &info->mti_pill;
952
953         if (req_capsule_has_field(pill, &RMF_MDT_BODY))
954                 result = mdt_body_unpack(info, flags);
955         else
956                 result = 0;
957
958         if (result == 0 && (flags & HABEO_REFERO))
959                 result = req_capsule_pack(pill);
960
961         RETURN(result);
962 }
963
964 /*
965  * Invoke handler for this request opc. Also do necessary preprocessing
966  * (according to handler ->mh_flags), and post-processing (setting of
967  * ->last_{xid,committed}).
968  */
969 static int mdt_req_handle(struct mdt_thread_info *info,
970                           struct mdt_handler *h, struct ptlrpc_request *req)
971 {
972         int   result;
973         __u32 flags;
974
975         ENTRY;
976
977         LASSERT(h->mh_act != NULL);
978         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
979         LASSERT(current->journal_info == NULL);
980
981         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
982
983         if (h->mh_fail_id != 0)
984                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
985
986         result = 0;
987         flags = h->mh_flags;
988         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
989
990         req_capsule_init(&info->mti_pill,
991                          req, RCL_SERVER, info->mti_rep_buf_size);
992
993         if (h->mh_fmt != NULL) {
994                 req_capsule_set(&info->mti_pill, h->mh_fmt);
995                 if (result == 0)
996                         result = mdt_unpack_req_pack_rep(info, flags);
997         }
998
999         if (result == 0 && flags & HABEO_CLAVIS) {
1000                 struct ldlm_request *dlm_req;
1001
1002                 LASSERT(h->mh_fmt != NULL);
1003
1004                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1005                 if (dlm_req != NULL) {
1006                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
1007                                 result = mdt_lock_resname_compat(info->mti_mdt,
1008                                                                  dlm_req);
1009                         info->mti_dlm_req = dlm_req;
1010                 } else {
1011                         CERROR("Can't unpack dlm request\n");
1012                         result = -EFAULT;
1013                 }
1014         }
1015
1016         if (result == 0)
1017                 /*
1018                  * Process request.
1019                  */
1020                 result = h->mh_act(info);
1021         /*
1022          * XXX result value is unconditionally shoved into ->rq_status
1023          * (original code sometimes placed error code into ->rq_status, and
1024          * sometimes returned it to the
1025          * caller). ptlrpc_server_handle_request() doesn't check return value
1026          * anyway.
1027          */
1028         req->rq_status = result;
1029
1030         LASSERT(current->journal_info == NULL);
1031
1032         if (result == 0 && flags & HABEO_CLAVIS &&
1033             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
1034                 struct ldlm_reply *dlm_rep;
1035
1036                 dlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1037                 if (dlm_rep != NULL)
1038                         result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
1039         }
1040
1041         /* If we're DISCONNECTing, the mds_export_data is already freed */
1042         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
1043                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
1044                 target_committed_to_req(req);
1045         }
1046         req_capsule_fini(&info->mti_pill);
1047         RETURN(result);
1048 }
1049
1050 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1051 {
1052         lh->mlh_lh.cookie = 0ull;
1053         lh->mlh_mode = LCK_MINMODE;
1054 }
1055
1056 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1057 {
1058         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1059 }
1060
1061 static void mdt_thread_info_init(struct mdt_thread_info *info)
1062 {
1063         int i;
1064
1065         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1066         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1067                 info->mti_rep_buf_size[i] = 0;
1068         info->mti_rep_buf_nr = i;
1069         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1070                 mdt_lock_handle_init(&info->mti_lh[i]);
1071 }
1072
1073 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1074 {
1075         int i;
1076
1077         if (info->mti_object != NULL) {
1078                 mdt_object_put(info->mti_ctxt, info->mti_object);
1079                 info->mti_object = NULL;
1080         }
1081         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1082                 mdt_lock_handle_fini(&info->mti_lh[i]);
1083 }
1084
1085 static int mds_msg_check_version(struct lustre_msg *msg)
1086 {
1087         int rc;
1088
1089         /* TODO: enable the below check while really introducing msg version.
1090          * it's disabled because it will break compatibility with b1_4.
1091          */
1092         return (0);
1093
1094         switch (msg->opc) {
1095         case MDS_CONNECT:
1096         case MDS_DISCONNECT:
1097         case OBD_PING:
1098                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1099                 if (rc)
1100                         CERROR("bad opc %u version %08x, expecting %08x\n",
1101                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
1102                 break;
1103         case MDS_GETSTATUS:
1104         case MDS_GETATTR:
1105         case MDS_GETATTR_NAME:
1106         case MDS_STATFS:
1107         case MDS_READPAGE:
1108         case MDS_REINT:
1109         case MDS_CLOSE:
1110         case MDS_DONE_WRITING:
1111         case MDS_PIN:
1112         case MDS_SYNC:
1113         case MDS_GETXATTR:
1114         case MDS_SETXATTR:
1115         case MDS_SET_INFO:
1116         case MDS_QUOTACHECK:
1117         case MDS_QUOTACTL:
1118         case QUOTA_DQACQ:
1119         case QUOTA_DQREL:
1120                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1121                 if (rc)
1122                         CERROR("bad opc %u version %08x, expecting %08x\n",
1123                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
1124                 break;
1125         case LDLM_ENQUEUE:
1126         case LDLM_CONVERT:
1127         case LDLM_BL_CALLBACK:
1128         case LDLM_CP_CALLBACK:
1129                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1130                 if (rc)
1131                         CERROR("bad opc %u version %08x, expecting %08x\n",
1132                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1133                 break;
1134         case OBD_LOG_CANCEL:
1135         case LLOG_ORIGIN_HANDLE_CREATE:
1136         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1137         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1138         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1139         case LLOG_ORIGIN_HANDLE_CLOSE:
1140         case LLOG_CATINFO:
1141                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1142                 if (rc)
1143                         CERROR("bad opc %u version %08x, expecting %08x\n",
1144                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1145                 break;
1146         default:
1147                 CERROR("MDS unknown opcode %d\n", msg->opc);
1148                 rc = -ENOTSUPP;
1149         }
1150         return rc;
1151 }
1152
1153 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
1154                                        struct obd_device *obd, int *process)
1155 {
1156         switch (req->rq_reqmsg->opc) {
1157         case MDS_CONNECT: /* This will never get here, but for completeness. */
1158         case OST_CONNECT: /* This will never get here, but for completeness. */
1159         case MDS_DISCONNECT:
1160         case OST_DISCONNECT:
1161                *process = 1;
1162                RETURN(0);
1163
1164         case MDS_CLOSE:
1165         case MDS_SYNC: /* used in unmounting */
1166         case OBD_PING:
1167         case MDS_REINT:
1168         case LDLM_ENQUEUE:
1169                 *process = target_queue_recovery_request(req, obd);
1170                 RETURN(0);
1171
1172         default:
1173                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1174                 *process = 0;
1175                 /* XXX what should we set rq_status to here? */
1176                 req->rq_status = -EAGAIN;
1177                 RETURN(ptlrpc_error(req));
1178         }
1179 }
1180
1181 /*
1182  * Handle recovery. Return:
1183  *        +1: continue request processing;
1184  *       -ve: abort immediately with the given error code;
1185  *         0: send reply with error code in req->rq_status;
1186  */
1187 static int mdt_recovery(struct ptlrpc_request *req)
1188 {
1189         int recovering;
1190         int abort_recovery;
1191         struct obd_device *obd;
1192
1193         ENTRY;
1194
1195         if (req->rq_reqmsg->opc == MDS_CONNECT)
1196                 RETURN(+1);
1197
1198         if (req->rq_export == NULL) {
1199                 CERROR("operation %d on unconnected MDS from %s\n",
1200                        req->rq_reqmsg->opc,
1201                        libcfs_id2str(req->rq_peer));
1202                 req->rq_status = -ENOTCONN;
1203                 RETURN(0);
1204         }
1205
1206         /* sanity check: if the xid matches, the request must be marked as a
1207          * resent or replayed */
1208         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
1209                       lustre_msg_get_flags(req->rq_reqmsg) &
1210                       (MSG_RESENT | MSG_REPLAY)),
1211                  "rq_xid "LPU64" matches last_xid, "
1212                  "expected RESENT flag\n", req->rq_xid);
1213
1214         /* else: note the opposite is not always true; a RESENT req after a
1215          * failover will usually not match the last_xid, since it was likely
1216          * never committed. A REPLAYed request will almost never match the
1217          * last xid, however it could for a committed, but still retained,
1218          * open. */
1219
1220         obd = req->rq_export->exp_obd;
1221
1222         /* Check for aborted recovery... */
1223         spin_lock_bh(&obd->obd_processing_task_lock);
1224         abort_recovery = obd->obd_abort_recovery;
1225         recovering = obd->obd_recovering;
1226         spin_unlock_bh(&obd->obd_processing_task_lock);
1227         if (abort_recovery) {
1228                 target_abort_recovery(obd);
1229         } else if (recovering) {
1230                 int rc;
1231                 int should_process;
1232
1233                 rc = mdt_filter_recovery_request(req, obd, &should_process);
1234                 if (rc != 0 || !should_process) {
1235                         LASSERT(rc < 0);
1236                         RETURN(rc);
1237                 }
1238         }
1239         RETURN(+1);
1240 }
1241
1242 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
1243 {
1244         struct obd_device *obd;
1245
1246         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1247                 if (req->rq_reqmsg->opc != OBD_PING)
1248                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1249
1250                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1251                 if (obd && obd->obd_recovering) {
1252                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1253                         RETURN(target_queue_final_reply(req, req->rq_status));
1254                 } else {
1255                         /* Lost a race with recovery; let the error path
1256                          * DTRT. */
1257                         req->rq_status = -ENOTCONN;
1258                 }
1259         }
1260         target_send_reply(req, req->rq_status, info->mti_fail_id);
1261         RETURN(req->rq_status);
1262 }
1263
1264 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
1265 {
1266         struct mdt_handler *h;
1267         struct lustre_msg  *msg;
1268         int                 result;
1269
1270         ENTRY;
1271
1272         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1273
1274         LASSERT(current->journal_info == NULL);
1275
1276         msg = req->rq_reqmsg;
1277         result = mds_msg_check_version(msg);
1278         if (result == 0) {
1279                 result = mdt_recovery(req);
1280                 switch (result) {
1281                 case +1:
1282                         h = mdt_handler_find(msg->opc);
1283                         if (h != NULL)
1284                                 result = mdt_req_handle(info, h, req);
1285                         else {
1286                                 req->rq_status = -ENOTSUPP;
1287                                 result = ptlrpc_error(req);
1288                                 break;
1289                         }
1290                         /* fall through */
1291                 case 0:
1292                         result = mdt_reply(req, info);
1293                 }
1294         } else
1295                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1296         RETURN(result);
1297 }
1298
1299 /*
1300  * MDT handler function called by ptlrpc service thread when request comes.
1301  *
1302  * XXX common "target" functionality should be factored into separate module
1303  * shared by mdt, ost and stand-alone services like fld.
1304  */
1305 static int mdt_handle(struct ptlrpc_request *req)
1306 {
1307         int result;
1308         struct lu_context      *ctx;
1309         struct mdt_thread_info *info;
1310         ENTRY;
1311
1312         ctx = req->rq_svc_thread->t_ctx;
1313         LASSERT(ctx != NULL);
1314         LASSERT(ctx->lc_thread == req->rq_svc_thread);
1315
1316         info = lu_context_key_get(ctx, &mdt_thread_key);
1317         LASSERT(info != NULL);
1318
1319         mdt_thread_info_init(info);
1320         /* it can be NULL while CONNECT */
1321         if (req->rq_export)
1322                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1323
1324         result = mdt_handle0(req, info);
1325         mdt_thread_info_fini(info);
1326         RETURN(result);
1327 }
1328
1329 /*Please move these functions from mds to mdt*/
1330 int intent_disposition(struct ldlm_reply *rep, int flag)
1331 {
1332         if (!rep)
1333                 return 0;
1334         return (rep->lock_policy_res1 & flag);
1335 }
1336
1337 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1338 {
1339         if (!rep)
1340                 return;
1341         rep->lock_policy_res1 |= flag;
1342 }
1343
1344 enum mdt_it_code {
1345         MDT_IT_OPEN,
1346         MDT_IT_OCREAT,
1347         MDT_IT_CREATE,
1348         MDT_IT_GETATTR,
1349         MDT_IT_READDIR,
1350         MDT_IT_LOOKUP,
1351         MDT_IT_UNLINK,
1352         MDT_IT_TRUNC,
1353         MDT_IT_GETXATTR,
1354         MDT_IT_NR
1355 };
1356
1357 static int mdt_intent_getattr(enum mdt_it_code opcode,
1358                               struct mdt_thread_info *info,
1359                               struct ldlm_lock **,
1360                               int);
1361 static int mdt_intent_reint(enum mdt_it_code opcode,
1362                             struct mdt_thread_info *info,
1363                             struct ldlm_lock **,
1364                             int);
1365
1366 static struct mdt_it_flavor {
1367         const struct req_format *it_fmt;
1368         __u32                    it_flags;
1369         int                    (*it_act)(enum mdt_it_code ,
1370                                          struct mdt_thread_info *,
1371                                          struct ldlm_lock **,
1372                                          int);
1373         long                     it_reint;
1374 } mdt_it_flavor[] = {
1375         [MDT_IT_OPEN]     = {
1376                 .it_fmt   = &RQF_LDLM_INTENT,
1377                 /*.it_flags = HABEO_REFERO,*/
1378                 .it_flags = 0,
1379                 .it_act   = mdt_intent_reint,
1380                 .it_reint = REINT_OPEN
1381         },
1382         [MDT_IT_OCREAT]   = {
1383                 .it_fmt   = &RQF_LDLM_INTENT,
1384                 .it_flags = 0,
1385                 .it_act   = mdt_intent_reint,
1386                 .it_reint = REINT_OPEN
1387         },
1388         [MDT_IT_CREATE]   = {
1389                 .it_fmt   = &RQF_LDLM_INTENT,
1390                 .it_flags = 0,
1391                 .it_act   = mdt_intent_reint,
1392                 .it_reint = REINT_CREATE
1393         },
1394         [MDT_IT_GETATTR]  = {
1395                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1396                 .it_flags = 0,
1397                 .it_act   = mdt_intent_getattr
1398         },
1399         [MDT_IT_READDIR]  = {
1400                 .it_fmt   = NULL,
1401                 .it_flags = 0,
1402                 .it_act   = NULL
1403         },
1404         [MDT_IT_LOOKUP]   = {
1405                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1406                 .it_flags = 0,
1407                 .it_act   = mdt_intent_getattr
1408         },
1409         [MDT_IT_UNLINK]   = {
1410                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1411                 .it_flags = 0,
1412                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1413                 .it_reint = REINT_UNLINK
1414         },
1415         [MDT_IT_TRUNC]    = {
1416                 .it_fmt   = NULL,
1417                 .it_flags = 0,
1418                 .it_act   = NULL
1419         },
1420         [MDT_IT_GETXATTR] = {
1421                 .it_fmt   = NULL,
1422                 .it_flags = 0,
1423                 .it_act   = NULL
1424         }
1425 };
1426
1427 static int mdt_intent_getattr(enum mdt_it_code opcode,
1428                               struct mdt_thread_info *info,
1429                               struct ldlm_lock **lockp,
1430                               int flags)
1431 {
1432         __u64  child_bits;
1433         struct ldlm_lock *old_lock = *lockp;
1434         struct ldlm_lock *new_lock = NULL;
1435         struct ptlrpc_request *req = mdt_info_req(info);
1436         struct ldlm_reply *ldlm_rep;
1437         struct mdt_lock_handle lhc = {{0}};
1438         int    rc;
1439
1440         ENTRY;
1441
1442         switch (opcode) {
1443         case MDT_IT_LOOKUP:
1444                 child_bits = MDS_INODELOCK_LOOKUP;
1445                 break;
1446         case MDT_IT_GETATTR:
1447                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1448                 break;
1449         default:
1450                 CERROR("Unhandled till now");
1451                 RETURN(-EINVAL);
1452                 break;
1453         }
1454
1455         rc = mdt_getattr_name_lock(info, &lhc, child_bits);
1456         if (rc)
1457                 RETURN(rc);
1458         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1459         ldlm_rep->lock_policy_res2 = rc;
1460
1461         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
1462
1463         if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1464                 ldlm_rep->lock_policy_res2 = 0;
1465         if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1466             ldlm_rep->lock_policy_res2) {
1467                 RETURN(ELDLM_LOCK_ABORTED);
1468         }
1469
1470         new_lock = ldlm_handle2lock(&lhc.mlh_lh);
1471         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1472                 RETURN(0);
1473
1474         *lockp = new_lock;
1475
1476         /* Fixup the lock to be given to the client */
1477         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1478         new_lock->l_readers = 0;
1479         new_lock->l_writers = 0;
1480
1481         new_lock->l_export = class_export_get(req->rq_export);
1482         list_add(&new_lock->l_export_chain,
1483                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1484
1485         new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1486         new_lock->l_completion_ast = old_lock->l_completion_ast;
1487
1488         new_lock->l_remote_handle = old_lock->l_remote_handle;
1489
1490         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1491
1492         LDLM_LOCK_PUT(new_lock);
1493         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1494
1495         RETURN(ELDLM_LOCK_REPLACED);
1496 }
1497
1498 static int mdt_intent_reint(enum mdt_it_code opcode,
1499                             struct mdt_thread_info *info,
1500                             struct ldlm_lock **lockp,
1501                             int flags)
1502 {
1503         long opc;
1504         struct ldlm_reply *rep;
1505
1506         static const struct req_format *intent_fmts[REINT_MAX] = {
1507                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
1508                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
1509         };
1510
1511         ENTRY;
1512
1513         opc = mdt_reint_opcode(info, intent_fmts);
1514         if (opc < 0)
1515                 RETURN(opc);
1516
1517         if (mdt_it_flavor[opcode].it_reint != opc) {
1518                 CERROR("Reint code %ld doesn't match intent: %d\n",
1519                        opc, opcode);
1520                 RETURN(-EPROTO);
1521         }
1522
1523         opc = req_capsule_pack(&info->mti_pill);
1524         if (opc)
1525                 RETURN(opc);
1526
1527         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1528         if (rep == NULL)
1529                 RETURN(-EFAULT);
1530         rep->lock_policy_res2 = mdt_reint_internal(info, opc);
1531         intent_set_disposition(rep, DISP_IT_EXECD);
1532
1533         RETURN(ELDLM_LOCK_ABORTED);
1534 }
1535
1536 static int mdt_intent_code(long itcode)
1537 {
1538         int result;
1539
1540         switch(itcode) {
1541         case IT_OPEN:
1542                 result = MDT_IT_OPEN;
1543                 break;
1544         case IT_OPEN|IT_CREAT:
1545                 result = MDT_IT_OCREAT;
1546                 break;
1547         case IT_CREAT:
1548                 result = MDT_IT_CREATE;
1549                 break;
1550         case IT_READDIR:
1551                 result = MDT_IT_READDIR;
1552                 break;
1553         case IT_GETATTR:
1554                 result = MDT_IT_GETATTR;
1555                 break;
1556         case IT_LOOKUP:
1557                 result = MDT_IT_LOOKUP;
1558                 break;
1559         case IT_UNLINK:
1560                 result = MDT_IT_UNLINK;
1561                 break;
1562         case IT_TRUNC:
1563                 result = MDT_IT_TRUNC;
1564                 break;
1565         case IT_GETXATTR:
1566                 result = MDT_IT_GETXATTR;
1567                 break;
1568         default:
1569                 CERROR("Unknown intent opcode: %ld\n", itcode);
1570                 result = -EINVAL;
1571                 break;
1572         }
1573         return result;
1574 }
1575
1576 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1577                           struct ldlm_lock **lockp, int flags)
1578 {
1579         int opc;
1580         int rc;
1581
1582         struct req_capsule   *pill;
1583         struct mdt_it_flavor *flv;
1584
1585         ENTRY;
1586
1587         opc = mdt_intent_code(itopc);
1588         if (opc < 0)
1589                 RETURN(-EINVAL);
1590
1591         pill = &info->mti_pill;
1592         flv  = &mdt_it_flavor[opc];
1593
1594         if (flv->it_fmt != NULL)
1595                 req_capsule_extend(pill, flv->it_fmt);
1596
1597         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1598         if (rc)
1599                 RETURN(rc);
1600
1601
1602          /* execute policy */
1603          /*XXX LASSERT( flv->it_act) */
1604          if (flv->it_act) {
1605                  rc = flv->it_act(opc, info, lockp, flags);
1606          } else
1607                  rc = -EOPNOTSUPP;
1608
1609         RETURN(rc);
1610 }
1611
1612 static int mdt_intent_policy(struct ldlm_namespace *ns,
1613                              struct ldlm_lock **lockp, void *req_cookie,
1614                              ldlm_mode_t mode, int flags, void *data)
1615 {
1616         struct mdt_thread_info *info;
1617         struct ptlrpc_request  *req  =  req_cookie;
1618         struct ldlm_intent     *it;
1619         struct req_capsule     *pill;
1620         struct ldlm_lock       *lock = *lockp;
1621         int rc;
1622
1623         ENTRY;
1624
1625         LASSERT(req != NULL);
1626
1627         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1628         LASSERT(info != NULL);
1629         pill = &info->mti_pill;
1630         LASSERT(pill->rc_req == req);
1631
1632         if (req->rq_reqmsg->bufcount > MDS_REQ_INTENT_IT_OFF) {
1633                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1634                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1635                 if (it != NULL) {
1636                         LDLM_DEBUG(lock, "intent policy opc: %s",
1637                                    ldlm_it2str(it->opc));
1638
1639                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
1640                         if (rc == 0)
1641                                 rc = ELDLM_OK;
1642                 } else
1643                         rc = -EFAULT;
1644         } else {
1645                 /* No intent was provided */
1646                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1647                 rc = req_capsule_pack(pill);
1648         }
1649         RETURN(rc);
1650 }
1651
1652 /*
1653  * Seq wrappers
1654  */
1655 static int mdt_seq_fini(const struct lu_context *ctx,
1656                         struct mdt_device *m)
1657 {
1658         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1659         ENTRY;
1660
1661         if (ls->ls_server_seq) {
1662                 seq_server_fini(ls->ls_server_seq, ctx);
1663                 OBD_FREE_PTR(ls->ls_server_seq);
1664                 ls->ls_server_seq = NULL;
1665         }
1666         if (ls->ls_client_seq) {
1667                 seq_client_fini(ls->ls_client_seq);
1668                 OBD_FREE_PTR(ls->ls_client_seq);
1669                 ls->ls_client_seq = NULL;
1670         }
1671         RETURN(0);
1672 }
1673
1674 static int mdt_seq_init(const struct lu_context *ctx,
1675                         const char *uuid, 
1676                         struct mdt_device *m)
1677 {
1678         struct lu_site *ls;
1679         int rc;
1680         ENTRY;
1681
1682         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1683
1684         OBD_ALLOC_PTR(ls->ls_server_seq);
1685
1686         if (ls->ls_server_seq != NULL) {
1687                 rc = seq_server_init(ls->ls_server_seq, 
1688                                       m->mdt_bottom, uuid,
1689                                      ctx);
1690         } else
1691                 rc = -ENOMEM;
1692
1693         if (rc)
1694                 mdt_seq_fini(ctx, m);
1695         RETURN(rc);
1696 }
1697
1698 /* XXX: this is ugly, should be something else */
1699 static int mdt_seq_ctlr_init(const struct lu_context *ctx,
1700                              struct mdt_device *m,
1701                              struct lustre_cfg *cfg)
1702 {
1703         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1704         struct obd_device *mdc;
1705         struct obd_uuid uuid;
1706         char *uuid_str;
1707         int rc, index;
1708         ENTRY;
1709
1710         index = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
1711
1712         /* check if this is first MDC add and controller is not yet
1713          * initialized. */
1714         if (index != 0 || ls->ls_controller)
1715                 RETURN(0);
1716
1717         uuid_str = lustre_cfg_string(cfg, 1);
1718         obd_str2uuid(&uuid, uuid_str);
1719         mdc = class_find_client_obd(&uuid, LUSTRE_MDC_NAME, NULL);
1720         if (!mdc) {
1721                 CERROR("can't find controller MDC by uuid %s\n",
1722                        uuid_str);
1723                 rc = -ENOENT;
1724         } else if (!mdc->obd_set_up) {
1725                 CERROR("target %s not set up\n", mdc->obd_name);
1726                 rc = -EINVAL;
1727         } else {
1728                 struct lustre_handle conn = {0, };
1729
1730                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1731                        mdc->obd_name, mdc->obd_uuid.uuid);
1732
1733                 rc = obd_connect(&conn, mdc, &mdc->obd_uuid, NULL);
1734
1735                 if (rc) {
1736                         CERROR("target %s connect error %d\n",
1737                                mdc->obd_name, rc);
1738                 } else {
1739                         ls->ls_controller = class_conn2export(&conn);
1740
1741                         OBD_ALLOC_PTR(ls->ls_client_seq);
1742
1743                         if (ls->ls_client_seq != NULL) {
1744                                 rc = seq_client_init(ls->ls_client_seq,
1745                                                      mdc->obd_name,
1746                                                      ls->ls_controller);
1747                         } else
1748                                 rc = -ENOMEM;
1749
1750                         if (rc)
1751                                 RETURN(rc);
1752
1753                         LASSERT(ls->ls_server_seq != NULL);
1754
1755                         rc = seq_server_set_ctlr(ls->ls_server_seq,
1756                                                  ls->ls_client_seq,
1757                                                  ctx);
1758                 }
1759         }
1760
1761         RETURN(rc);
1762 }
1763
1764 static void mdt_seq_ctlr_fini(struct mdt_device *m)
1765 {
1766         struct lu_site *ls;
1767
1768         ENTRY;
1769
1770         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1771         if (ls && ls->ls_controller) {
1772                 int rc;
1773
1774                 rc = obd_disconnect(ls->ls_controller);
1775                 ls->ls_controller = NULL;
1776                 if (rc != 0)
1777                         CERROR("Failure to disconnect obd: %d\n", rc);
1778         }
1779         EXIT;
1780 }
1781
1782 /*
1783  * FLD wrappers
1784  */
1785 static int mdt_fld_init(const struct lu_context *ctx,
1786                         const char *uuid, 
1787                         struct mdt_device *m)
1788 {
1789         struct lu_site *ls;
1790         int rc;
1791         ENTRY;
1792
1793         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1794
1795         OBD_ALLOC_PTR(ls->ls_fld);
1796
1797         if (ls->ls_fld != NULL)
1798                 rc = fld_server_init(ls->ls_fld, ctx,
1799                                      uuid, m->mdt_bottom);
1800         else
1801                 rc = -ENOMEM;
1802
1803         RETURN(rc);
1804 }
1805
1806 static int mdt_fld_fini(const struct lu_context *ctx,
1807                         struct mdt_device *m)
1808 {
1809         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1810         ENTRY;
1811
1812         if (ls && ls->ls_fld) {
1813                 fld_server_fini(ls->ls_fld, ctx);
1814                 OBD_FREE_PTR(ls->ls_fld);
1815         }
1816         RETURN(0);
1817 }
1818
1819 /* device init/fini methods */
1820 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1821 {
1822         if (m->mdt_service != NULL) {
1823                 ptlrpc_unregister_service(m->mdt_service);
1824                 m->mdt_service = NULL;
1825         }
1826 }
1827
1828 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1829 {
1830         int rc;
1831         struct ptlrpc_service_conf conf = {
1832                 .psc_nbufs            = MDS_NBUFS,
1833                 .psc_bufsize          = MDS_BUFSIZE,
1834                 .psc_max_req_size     = MDS_MAXREQSIZE,
1835                 .psc_max_reply_size   = MDS_MAXREPSIZE,
1836                 .psc_req_portal       = MDS_REQUEST_PORTAL,
1837                 .psc_rep_portal       = MDC_REPLY_PORTAL,
1838                 .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
1839                 /*
1840                  * We'd like to have a mechanism to set this on a per-device
1841                  * basis, but alas...
1842                  */
1843                 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
1844                                        MDT_MAX_THREADS)
1845         };
1846
1847         ENTRY;
1848
1849
1850         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1851                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1852
1853         m->mdt_service =
1854                 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
1855                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1856                                      NULL);
1857         if (m->mdt_service == NULL)
1858                 RETURN(-ENOMEM);
1859
1860         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1861         if (rc)
1862                 GOTO(err_mdt_svc, rc);
1863
1864         RETURN(rc);
1865 err_mdt_svc:
1866         ptlrpc_unregister_service(m->mdt_service);
1867         m->mdt_service = NULL;
1868
1869         RETURN(rc);
1870 }
1871
1872 static void mdt_stack_fini(const struct lu_context *ctx,
1873                            struct mdt_device *m, struct lu_device *d)
1874 {
1875         /* goes through all stack */
1876         while (d != NULL) {
1877                 struct lu_device *n;
1878                 struct obd_type *type;
1879                 struct lu_device_type *ldt = d->ld_type;
1880
1881                 lu_device_put(d);
1882
1883                 /* each fini() returns next device in stack of layers
1884                  * * so we can avoid the recursion */
1885                 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
1886                 ldt->ldt_ops->ldto_device_free(ctx, d);
1887
1888                 type = ldt->ldt_obd_type;
1889                 type->typ_refcnt--;
1890                 class_put_type(type);
1891                 /* switch to the next device in the layer */
1892                 d = n;
1893         }
1894         m->mdt_child = NULL;
1895 }
1896
1897 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
1898                                          const char *typename,
1899                                          struct lu_device *child,
1900                                          struct lustre_cfg *cfg)
1901 {
1902         struct obd_type       *type;
1903         struct lu_device_type *ldt;
1904         struct lu_device      *d;
1905         int rc;
1906
1907         /* find the type */
1908         type = class_get_type(typename);
1909         if (!type) {
1910                 CERROR("Unknown type: '%s'\n", typename);
1911                 GOTO(out, rc = -ENODEV);
1912         }
1913
1914         ldt = type->typ_lu;
1915         if (ldt == NULL) {
1916                 CERROR("type: '%s'\n", typename);
1917                 GOTO(out_type, rc = -EINVAL);
1918         }
1919
1920         ldt->ldt_obd_type = type;
1921         d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
1922         if (IS_ERR(d)) {
1923                 CERROR("Cannot allocate device: '%s'\n", typename);
1924                 GOTO(out_type, rc = -ENODEV);
1925         }
1926
1927         LASSERT(child->ld_site);
1928         d->ld_site = child->ld_site;
1929
1930         type->typ_refcnt++;
1931         rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
1932         if (rc) {
1933                 CERROR("can't init device '%s', rc %d\n", typename, rc);
1934                 GOTO(out_alloc, rc);
1935         }
1936         lu_device_get(d);
1937
1938         RETURN(d);
1939 out_alloc:
1940         ldt->ldt_ops->ldto_device_free(ctx, d);
1941         type->typ_refcnt--;
1942 out_type:
1943         class_put_type(type);
1944 out:
1945         RETURN(ERR_PTR(rc));
1946 }
1947
1948 static int mdt_stack_init(const struct lu_context *ctx,
1949                           struct mdt_device *m, struct lustre_cfg *cfg)
1950 {
1951         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
1952         struct lu_device  *tmp;
1953         int rc;
1954
1955         /* init the stack */
1956         tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
1957         if (IS_ERR(tmp)) {
1958                 RETURN (PTR_ERR(tmp));
1959         }
1960         m->mdt_bottom = lu2dt_dev(tmp);
1961         d = tmp;
1962         tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
1963         if (IS_ERR(tmp)) {
1964                 GOTO(out, rc = PTR_ERR(tmp));
1965         }
1966         d = tmp;
1967         tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
1968         if (IS_ERR(tmp)) {
1969                 GOTO(out, rc = PTR_ERR(tmp));
1970         }
1971         d = tmp;
1972         m->mdt_child = lu2md_dev(d);
1973
1974         /* process setup config */
1975         tmp = &m->mdt_md_dev.md_lu_dev;
1976         rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
1977
1978 out:
1979         /* fini from last known good lu_device */
1980         if (rc)
1981                 mdt_stack_fini(ctx, m, d);
1982
1983         return rc;
1984 }
1985
1986 static void mdt_fini(struct mdt_device *m)
1987 {
1988         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1989         struct lu_context ctx;
1990         int rc;
1991
1992         ENTRY;
1993
1994         rc = lu_context_init(&ctx);
1995         if (rc != 0) {
1996                 CERROR("Cannot initialize context: %d\n", rc);
1997                 EXIT;
1998         }
1999
2000         lu_context_enter(&ctx);
2001
2002         mdt_stop_ptlrpc_service(m);
2003
2004         /* finish the stack */
2005         mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
2006
2007         mdt_fld_fini(&ctx, m);
2008         mdt_seq_fini(&ctx, m);
2009         mdt_seq_ctlr_fini(m);
2010
2011         LASSERT(atomic_read(&d->ld_ref) == 0);
2012         md_device_fini(&m->mdt_md_dev);
2013
2014         if (m->mdt_namespace != NULL) {
2015                 ldlm_namespace_free(m->mdt_namespace, 0);
2016                 m->mdt_namespace = NULL;
2017         }
2018
2019         if (d->ld_site != NULL) {
2020                 lu_site_fini(d->ld_site);
2021                 OBD_FREE_PTR(d->ld_site);
2022                 d->ld_site = NULL;
2023         }
2024
2025         lu_context_exit(&ctx);
2026         lu_context_fini(&ctx);
2027
2028         EXIT;
2029 }
2030
2031 static int mdt_init0(struct mdt_device *m,
2032                      struct lu_device_type *t, struct lustre_cfg *cfg)
2033 {
2034         int rc;
2035         struct lu_site *s;
2036         char   ns_name[48];
2037         struct lu_context ctx;
2038         const char *dev = lustre_cfg_string(cfg, 0);
2039         const char *num = lustre_cfg_string(cfg, 2);
2040         struct obd_device *obd;
2041         ENTRY;
2042
2043         obd = class_name2obd(dev);
2044         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2045
2046         OBD_ALLOC_PTR(s);
2047         if (s == NULL)
2048                 RETURN(-ENOMEM);
2049
2050         md_device_init(&m->mdt_md_dev, t);
2051         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2052
2053         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2054         if (rc) {
2055                 CERROR("can't init lu_site, rc %d\n", rc);
2056                 GOTO(err_fini_site, rc);
2057         }
2058
2059         rc = lu_context_init(&ctx);
2060         if (rc != 0)
2061                 GOTO(err_fini_site, rc);
2062
2063         lu_context_enter(&ctx);
2064
2065         /* init the stack */
2066         rc = mdt_stack_init(&ctx, m, cfg);
2067         if (rc) {
2068                 CERROR("can't init device stack, rc %d\n", rc);
2069                 GOTO(err_fini_ctx, rc);
2070         }
2071         lu_context_exit(&ctx);
2072         if (rc)
2073                 GOTO(err_fini_ctx, rc);
2074
2075         /* set server index */
2076         LASSERT(num);
2077         s->ls_node_id = simple_strtol(num, NULL, 10);
2078
2079         lu_context_enter(&ctx);
2080         rc = mdt_fld_init(&ctx, obd->obd_name, m);
2081         lu_context_exit(&ctx);
2082         if (rc)
2083                 GOTO(err_fini_stack, rc);
2084
2085         lu_context_enter(&ctx);
2086         rc = mdt_seq_init(&ctx, obd->obd_name, m);
2087         lu_context_exit(&ctx);
2088         if (rc)
2089                 GOTO(err_fini_fld, rc);
2090
2091         lu_context_fini(&ctx);
2092
2093         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
2094         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
2095         if (m->mdt_namespace == NULL)
2096                 GOTO(err_fini_seq, rc = -ENOMEM);
2097
2098         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2099
2100         rc = mdt_start_ptlrpc_service(m);
2101         if (rc)
2102                 GOTO(err_free_ns, rc);
2103
2104         RETURN(0);
2105
2106 err_free_ns:
2107         ldlm_namespace_free(m->mdt_namespace, 0);
2108         m->mdt_namespace = NULL;
2109 err_fini_seq:
2110         mdt_seq_fini(&ctx, m);
2111 err_fini_fld:
2112         mdt_fld_fini(&ctx, m);
2113 err_fini_stack:
2114         mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
2115 err_fini_ctx:
2116         lu_context_fini(&ctx);
2117 err_fini_site:
2118         lu_site_fini(s);
2119         OBD_FREE_PTR(s);
2120         RETURN(rc);
2121 }
2122
2123 /* used by MGS to process specific configurations */
2124 static int mdt_process_config(const struct lu_context *ctx,
2125                               struct lu_device *d, struct lustre_cfg *cfg)
2126 {
2127         struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
2128         int err;
2129         ENTRY;
2130
2131         switch (cfg->lcfg_command) {
2132         case LCFG_ADD_MDC:
2133                 /* add mdc hook to get first MDT uuid and connect it to
2134                  * ls->controller to use for seq manager. */
2135                 err = mdt_seq_ctlr_init(ctx, mdt_dev(d), cfg);
2136                 if (err) {
2137                         CERROR("can't initialize controller export, "
2138                                "rc %d\n", err);
2139                 }
2140                 /* all MDT specific commands should be here */
2141         default:
2142                 /* others are passed further */
2143                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2144         }
2145         RETURN(err);
2146 }
2147
2148 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2149                                           const struct lu_object_header *hdr,
2150                                           struct lu_device *d)
2151 {
2152         struct mdt_object *mo;
2153
2154         ENTRY;
2155
2156         OBD_ALLOC_PTR(mo);
2157         if (mo != NULL) {
2158                 struct lu_object *o;
2159                 struct lu_object_header *h;
2160
2161                 o = &mo->mot_obj.mo_lu;
2162                 h = &mo->mot_header;
2163                 lu_object_header_init(h);
2164                 lu_object_init(o, h, d);
2165                 lu_object_add_top(h, o);
2166                 o->lo_ops = &mdt_obj_ops;
2167                 RETURN(o);
2168         } else
2169                 RETURN(NULL);
2170 }
2171
2172 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2173 {
2174         struct mdt_device *d = mdt_dev(o->lo_dev);
2175         struct lu_device  *under;
2176         struct lu_object  *below;
2177
2178         under = &d->mdt_child->md_lu_dev;
2179         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2180         if (below != NULL) {
2181                 lu_object_add(o, below);
2182                 return 0;
2183         } else
2184                 return -ENOMEM;
2185 }
2186
2187 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2188 {
2189         struct mdt_object *mo = mdt_obj(o);
2190         struct lu_object_header *h;
2191         ENTRY;
2192         h = o->lo_header;
2193         lu_object_fini(o);
2194         lu_object_header_fini(h);
2195         OBD_FREE_PTR(mo);
2196         EXIT;
2197 }
2198
2199 static int mdt_object_exists(const struct lu_context *ctx,
2200                              const struct lu_object *o)
2201 {
2202         return lu_object_exists(ctx, lu_object_next(o));
2203 }
2204
2205 static int mdt_object_print(const struct lu_context *ctxt,
2206                             struct seq_file *f, const struct lu_object *o)
2207 {
2208         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
2209 }
2210
2211 static struct lu_device_operations mdt_lu_ops = {
2212         .ldo_object_alloc   = mdt_object_alloc,
2213         .ldo_process_config = mdt_process_config
2214 };
2215
2216 static struct lu_object_operations mdt_obj_ops = {
2217         .loo_object_init    = mdt_object_init,
2218         .loo_object_free    = mdt_object_free,
2219         .loo_object_print   = mdt_object_print,
2220         .loo_object_exists  = mdt_object_exists
2221 };
2222
2223 /* mds_connect_internal */
2224 static int mdt_connect0(struct mdt_device *mdt,
2225                         struct obd_export *exp, struct obd_connect_data *data)
2226 {
2227         if (data != NULL) {
2228                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2229                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2230
2231                 /* If no known bits (which should not happen, probably,
2232                    as everybody should support LOOKUP and UPDATE bits at least)
2233                    revert to compat mode with plain locks. */
2234                 if (!data->ocd_ibits_known &&
2235                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
2236                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2237
2238                 if (!mdt->mdt_opts.mo_acl)
2239                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2240
2241                 if (!mdt->mdt_opts.mo_user_xattr)
2242                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2243
2244                 exp->exp_connect_flags = data->ocd_connect_flags;
2245                 data->ocd_version = LUSTRE_VERSION_CODE;
2246                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
2247         }
2248
2249         if (mdt->mdt_opts.mo_acl &&
2250             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2251                 CWARN("%s: MDS requires ACL support but client does not\n",
2252                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2253                 return -EBADE;
2254         }
2255         return 0;
2256 }
2257
2258 /* mds_connect copy */
2259 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
2260                            struct obd_uuid *cluuid,
2261                            struct obd_connect_data *data)
2262 {
2263         struct obd_export *exp;
2264         int rc;
2265         struct mdt_device *mdt;
2266         struct mds_export_data *med;
2267         struct mds_client_data *mcd = NULL;
2268         ENTRY;
2269
2270         if (!conn || !obd || !cluuid)
2271                 RETURN(-EINVAL);
2272
2273         mdt = mdt_dev(obd->obd_lu_dev);
2274
2275         rc = class_connect(conn, obd, cluuid);
2276         if (rc)
2277                 RETURN(rc);
2278
2279         exp = class_conn2export(conn);
2280         LASSERT(exp != NULL);
2281         med = &exp->exp_mds_data;
2282
2283         rc = mdt_connect0(mdt, exp, data);
2284         if (rc == 0) {
2285                 OBD_ALLOC_PTR(mcd);
2286                 if (mcd != NULL) {
2287                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2288                         med->med_mcd = mcd;
2289                 } else
2290                         rc = -ENOMEM;
2291         }
2292         if (rc)
2293                 class_disconnect(exp);
2294         else
2295                 class_export_put(exp);
2296
2297         RETURN(rc);
2298 }
2299
2300 static int mdt_obd_disconnect(struct obd_export *exp)
2301 {
2302         struct mds_export_data *med = &exp->exp_mds_data;
2303         unsigned long irqflags;
2304         int rc;
2305         ENTRY;
2306
2307         LASSERT(exp);
2308         class_export_get(exp);
2309
2310         /* Disconnect early so that clients can't keep using export */
2311         rc = class_disconnect(exp);
2312         //ldlm_cancel_locks_for_export(exp);
2313
2314         /* complete all outstanding replies */
2315         spin_lock_irqsave(&exp->exp_lock, irqflags);
2316         while (!list_empty(&exp->exp_outstanding_replies)) {
2317                 struct ptlrpc_reply_state *rs =
2318                         list_entry(exp->exp_outstanding_replies.next,
2319                                    struct ptlrpc_reply_state, rs_exp_list);
2320                 struct ptlrpc_service *svc = rs->rs_service;
2321
2322                 spin_lock(&svc->srv_lock);
2323                 list_del_init(&rs->rs_exp_list);
2324                 ptlrpc_schedule_difficult_reply(rs);
2325                 spin_unlock(&svc->srv_lock);
2326         }
2327         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
2328
2329         OBD_FREE_PTR(med->med_mcd);
2330
2331         class_export_put(exp);
2332         RETURN(rc);
2333 }
2334
2335 static struct obd_ops mdt_obd_device_ops = {
2336         .o_owner = THIS_MODULE,
2337         .o_connect = mdt_obd_connect,
2338         .o_disconnect = mdt_obd_disconnect,
2339 };
2340
2341 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2342 {
2343         struct mdt_device *m = mdt_dev(d);
2344
2345         mdt_fini(m);
2346         OBD_FREE_PTR(m);
2347 }
2348
2349 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2350                                           struct lu_device_type *t,
2351                                           struct lustre_cfg *cfg)
2352 {
2353         struct lu_device  *l;
2354         struct mdt_device *m;
2355
2356         OBD_ALLOC_PTR(m);
2357         if (m != NULL) {
2358                 int result;
2359
2360                 l = &m->mdt_md_dev.md_lu_dev;
2361                 result = mdt_init0(m, t, cfg);
2362                 if (result != 0) {
2363                         mdt_device_free(ctx, l);
2364                         return ERR_PTR(result);
2365                 }
2366
2367         } else
2368                 l = ERR_PTR(-ENOMEM);
2369         return l;
2370 }
2371
2372 /*
2373  * context key constructor/destructor
2374  */
2375
2376 static void *mdt_thread_init(const struct lu_context *ctx,
2377                              struct lu_context_key *key)
2378 {
2379         struct mdt_thread_info *info;
2380
2381         /*
2382          * check that no high order allocations are incurred.
2383          */
2384         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2385         OBD_ALLOC_PTR(info);
2386         if (info != NULL)
2387                 info->mti_ctxt = ctx;
2388         else
2389                 info = ERR_PTR(-ENOMEM);
2390         return info;
2391 }
2392
2393 static void mdt_thread_fini(const struct lu_context *ctx,
2394                             struct lu_context_key *key, void *data)
2395 {
2396         struct mdt_thread_info *info = data;
2397         OBD_FREE_PTR(info);
2398 }
2399
2400 static struct lu_context_key mdt_thread_key = {
2401         .lct_init = mdt_thread_init,
2402         .lct_fini = mdt_thread_fini
2403 };
2404
2405 static int mdt_type_init(struct lu_device_type *t)
2406 {
2407         return lu_context_key_register(&mdt_thread_key);
2408 }
2409
2410 static void mdt_type_fini(struct lu_device_type *t)
2411 {
2412         lu_context_key_degister(&mdt_thread_key);
2413 }
2414
2415 static struct lu_device_type_operations mdt_device_type_ops = {
2416         .ldto_init = mdt_type_init,
2417         .ldto_fini = mdt_type_fini,
2418
2419         .ldto_device_alloc = mdt_device_alloc,
2420         .ldto_device_free  = mdt_device_free
2421 };
2422
2423 static struct lu_device_type mdt_device_type = {
2424         .ldt_tags = LU_DEVICE_MD,
2425         .ldt_name = LUSTRE_MDT0_NAME,
2426         .ldt_ops  = &mdt_device_type_ops
2427 };
2428
2429 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2430         { 0 }
2431 };
2432
2433 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2434         { 0 }
2435 };
2436
2437 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2438
2439 static int __init mdt_mod_init(void)
2440 {
2441         struct lprocfs_static_vars lvars;
2442
2443         mdt_num_threads = MDT_NUM_THREADS;
2444         lprocfs_init_vars(mdt, &lvars);
2445         return class_register_type(&mdt_obd_device_ops, NULL,
2446                                    lvars.module_vars, LUSTRE_MDT0_NAME,
2447                                    &mdt_device_type);
2448 }
2449
2450 static void __exit mdt_mod_exit(void)
2451 {
2452         class_unregister_type(LUSTRE_MDT0_NAME);
2453 }
2454
2455
2456 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
2457 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
2458         .mh_name    = #opc,                                             \
2459         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
2460         .mh_opc     = prefix ## _  ## opc,                              \
2461         .mh_flags   = flags,                                            \
2462         .mh_act     = fn,                                               \
2463         .mh_fmt     = fmt                                               \
2464 }
2465
2466 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
2467         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2468 /*
2469  * Request with a format known in advance
2470  */
2471 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
2472         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2473 /*
2474  * Request with a format we do not yet know
2475  */
2476 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
2477         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2478
2479 static struct mdt_handler mdt_mds_ops[] = {
2480 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
2481 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
2482 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
2483 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETATTR,      mdt_getattr),
2484 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETATTR_NAME, mdt_getattr_name),
2485 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, SETXATTR,     mdt_setxattr),
2486 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
2487 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
2488 DEF_MDT_HNDL_0(HABEO_CORPUS,              READPAGE,     mdt_readpage),
2489 DEF_MDT_HNDL_F(0,                         REINT,        mdt_reint),
2490 DEF_MDT_HNDL_0(HABEO_CORPUS,              CLOSE,        mdt_close),
2491 DEF_MDT_HNDL_0(0,                         DONE_WRITING, mdt_done_writing),
2492 DEF_MDT_HNDL_0(0,                         PIN,          mdt_pin),
2493 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
2494 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_handle_quotacheck),
2495 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_handle_quotactl)
2496 };
2497
2498 #define DEF_OBD_HNDL(flags, name, fn)                   \
2499         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2500
2501
2502 static struct mdt_handler mdt_obd_ops[] = {
2503         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
2504         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
2505         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
2506 };
2507
2508 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
2509         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2510 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
2511         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2512
2513 static struct mdt_handler mdt_dlm_ops[] = {
2514         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
2515         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
2516         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
2517         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
2518 };
2519
2520 static struct mdt_handler mdt_llog_ops[] = {
2521 };
2522
2523 static struct mdt_opc_slice mdt_handlers[] = {
2524         {
2525                 .mos_opc_start = MDS_GETATTR,
2526                 .mos_opc_end   = MDS_LAST_OPC,
2527                 .mos_hs        = mdt_mds_ops
2528         },
2529         {
2530                 .mos_opc_start = OBD_PING,
2531                 .mos_opc_end   = OBD_LAST_OPC,
2532                 .mos_hs        = mdt_obd_ops
2533         },
2534         {
2535                 .mos_opc_start = LDLM_ENQUEUE,
2536                 .mos_opc_end   = LDLM_LAST_OPC,
2537                 .mos_hs        = mdt_dlm_ops
2538         },
2539         {
2540                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2541                 .mos_opc_end   = LLOG_LAST_OPC,
2542                 .mos_hs        = mdt_llog_ops
2543         },
2544         {
2545                 .mos_hs        = NULL
2546         }
2547 };
2548
2549 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2550 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
2551 MODULE_LICENSE("GPL");
2552
2553 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
2554                 "number of mdt service threads to start");
2555
2556 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);