Whamcloud - gitweb
- changes about fids:
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53
54 #include <linux/lu_object.h>
55
56 /* struct mds_client_data */
57 #include "../mds/mds_internal.h"
58 #include "mdt_internal.h"
59
60 /*
61  * Initialized in mdt_mod_init().
62  */
63 unsigned long mdt_num_threads;
64
65 static int mdt_handle(struct ptlrpc_request *req);
66 static struct ptlrpc_thread_key mdt_thread_key;
67
68 static int mdt_getstatus(struct mdt_thread_info *info,
69                          struct ptlrpc_request *req, int offset)
70 {
71         struct md_device *mdd  = info->mti_mdt->mdt_child;
72         struct mdt_body  *body;
73         int               size = sizeof *body;
74         int               result;
75
76         ENTRY;
77
78         result = lustre_pack_reply(req, 1, &size, NULL);
79         if (result)
80                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
81                        size);
82         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
83                 result = -ENOMEM;
84         else {
85                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
86                 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
87         }
88
89         /* the last_committed and last_xid fields are filled in for all
90          * replies already - no need to do so here also.
91          */
92         RETURN(result);
93 }
94
95 static int mdt_connect(struct mdt_thread_info *info,
96                        struct ptlrpc_request *req, int offset)
97 {
98         return target_handle_connect(req, mdt_handle);
99 }
100
101 static int mdt_disconnect(struct mdt_thread_info *info,
102                           struct ptlrpc_request *req, int offset)
103 {
104         return -EOPNOTSUPP;
105 }
106
107 static int mdt_getattr(struct mdt_thread_info *info,
108                        struct ptlrpc_request *req, int offset)
109 {
110         return -EOPNOTSUPP;
111 }
112
113 static int mdt_getattr_name(struct mdt_thread_info *info,
114                             struct ptlrpc_request *req, int offset)
115 {
116         return -EOPNOTSUPP;
117 }
118
119 static int mdt_setxattr(struct mdt_thread_info *info,
120                         struct ptlrpc_request *req, int offset)
121 {
122         return -EOPNOTSUPP;
123 }
124
125 static int mdt_getxattr(struct mdt_thread_info *info,
126                         struct ptlrpc_request *req, int offset)
127 {
128         return -EOPNOTSUPP;
129 }
130
131 static int mdt_statfs(struct mdt_thread_info *info,
132                       struct ptlrpc_request *req, int offset)
133 {
134         return -EOPNOTSUPP;
135 }
136
137 static int mdt_readpage(struct mdt_thread_info *info,
138                        struct ptlrpc_request *req, int offset)
139 {
140         return -EOPNOTSUPP;
141 }
142
143 static int mdt_reint(struct mdt_thread_info *info,
144                      struct ptlrpc_request *req, int offset)
145 {
146         return -EOPNOTSUPP;
147 }
148
149 static int mdt_close(struct mdt_thread_info *info,
150                      struct ptlrpc_request *req, int offset)
151 {
152         return -EOPNOTSUPP;
153 }
154
155 static int mdt_done_writing(struct mdt_thread_info *info,
156                             struct ptlrpc_request *req, int offset)
157 {
158         return -EOPNOTSUPP;
159 }
160
161 static int mdt_pin(struct mdt_thread_info *info,
162                    struct ptlrpc_request *req, int offset)
163 {
164         return -EOPNOTSUPP;
165 }
166
167 static int mdt_sync(struct mdt_thread_info *info,
168                     struct ptlrpc_request *req, int offset)
169 {
170         return -EOPNOTSUPP;
171 }
172
173 static int mdt_set_info(struct mdt_thread_info *info,
174                         struct ptlrpc_request *req, int offset)
175 {
176         return -EOPNOTSUPP;
177 }
178
179 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
180                                  struct ptlrpc_request *req, int offset)
181 {
182         return -EOPNOTSUPP;
183 }
184
185 static int mdt_handle_quotactl(struct mdt_thread_info *info,
186                                struct ptlrpc_request *req, int offset)
187 {
188         return -EOPNOTSUPP;
189 }
190
191 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
192 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
193              struct lustre_handle *lh, ldlm_mode_t mode,
194              ldlm_policy_data_t *policy)
195 {
196         struct ldlm_res_id res_id = { .name = {0} };
197         int flags = 0, rc;
198         ENTRY;
199
200         LASSERT(ns != NULL);
201         LASSERT(lh != NULL);
202         LASSERT(f != NULL);
203
204         /* we use fid_num() whoch includes also object version instread of raw
205          * fid_oid(). */
206         res_id.name[0] = fid_seq(f);
207         res_id.name[1] = fid_num(f);
208
209         /* FIXME: is that correct to have @flags=0 here? */
210         rc = ldlm_cli_enqueue(NULL, NULL, ns, res_id, LDLM_IBITS, policy,
211                               mode, &flags, ldlm_blocking_ast,
212                               ldlm_completion_ast, NULL, NULL,
213                               NULL, 0, NULL, lh);
214         RETURN (rc == ELDLM_OK ? 0 : -EIO);
215 }
216
217 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
218                 struct lustre_handle *lh, ldlm_mode_t mode)
219 {
220         struct ldlm_lock *lock;
221         ENTRY;
222
223         /* FIXME: this is debug stuff, remove it later. */
224         lock = ldlm_handle2lock(lh);
225         if (!lock) {
226                 CERROR("invalid lock handle "LPX64, lh->cookie);
227                 LBUG();
228         }
229
230         LASSERT(fid_seq(f) == lock->l_resource->lr_name.name[0] &&
231                 fid_num(f) == lock->l_resource->lr_name.name[1]);
232
233         ldlm_lock_decref(lh, mode);
234         EXIT;
235 }
236
237 static struct lu_device_operations mdt_lu_ops;
238
239 static int lu_device_is_mdt(struct lu_device *d)
240 {
241         /*
242          * XXX for now. Tags in lu_device_type->ldt_something are needed.
243          */
244         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
245 }
246
247 static struct mdt_object *mdt_obj(struct lu_object *o)
248 {
249         LASSERT(lu_device_is_mdt(o->lo_dev));
250         return container_of(o, struct mdt_object, mot_obj.mo_lu);
251 }
252
253 struct mdt_object *mdt_object_find(struct mdt_device *d, struct lu_fid *f)
254 {
255         struct lu_object *o;
256
257         o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
258         if (IS_ERR(o))
259                 return (struct mdt_object *)o;
260         else
261                 return mdt_obj(o);
262 }
263
264 void mdt_object_put(struct mdt_object *o)
265 {
266         lu_object_put(&o->mot_obj.mo_lu);
267 }
268
269 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
270 {
271         return lu_object_fid(&o->mot_obj.mo_lu);
272 }
273
274 static int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
275                            struct mdt_lock_handle *lh, __u64 ibits)
276 {
277         ldlm_policy_data_t p = {
278                 .l_inodebits = {
279                         .bits = ibits
280                 }
281         };
282         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
283         LASSERT(lh->mlh_mode != LCK_MINMODE);
284
285         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
286 }
287
288 static void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
289                               struct mdt_lock_handle *lh)
290 {
291         if (lustre_handle_is_used(&lh->mlh_lh)) {
292                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
293                 lh->mlh_lh.cookie = 0;
294         }
295 }
296
297 struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct lu_fid *f,
298                                         struct mdt_lock_handle *lh, __u64 ibits)
299 {
300         struct mdt_object *o;
301
302         o = mdt_object_find(d, f);
303         if (!IS_ERR(o)) {
304                 int result;
305
306                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
307                 if (result != 0) {
308                         mdt_object_put(o);
309                         o = ERR_PTR(result);
310                 }
311         }
312         return o;
313 }
314
315 struct mdt_handler {
316         const char *mh_name;
317         int         mh_fail_id;
318         __u32       mh_opc;
319         __u32       mh_flags;
320         int (*mh_act)(struct mdt_thread_info *info,
321                       struct ptlrpc_request *req, int offset);
322 };
323
324 enum mdt_handler_flags {
325         /*
326          * struct mdt_body is passed in the 0-th incoming buffer.
327          */
328         HABEO_CORPUS = (1 << 0)
329 };
330
331 struct mdt_opc_slice {
332         __u32               mos_opc_start;
333         int                 mos_opc_end;
334         struct mdt_handler *mos_hs;
335 };
336
337 static struct mdt_opc_slice mdt_handlers[];
338
339 struct mdt_handler *mdt_handler_find(__u32 opc)
340 {
341         struct mdt_opc_slice *s;
342         struct mdt_handler   *h;
343
344         h = NULL;
345         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
346                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
347                         h = s->mos_hs + (opc - s->mos_opc_start);
348                         if (h->mh_opc != 0)
349                                 LASSERT(h->mh_opc == opc);
350                         else
351                                 h = NULL; /* unsupported opc */
352                         break;
353                 }
354         }
355         return h;
356 }
357
358 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
359 {
360         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
361 }
362
363 /*
364  * Invoke handler for this request opc. Also do necessary preprocessing
365  * (according to handler ->mh_flags), and post-processing (setting of
366  * ->last_{xid,committed}).
367  */
368 static int mdt_req_handle(struct mdt_thread_info *info,
369                           struct mdt_handler *h, struct ptlrpc_request *req,
370                           int shift)
371 {
372         int result;
373         int off;
374
375         ENTRY;
376
377         LASSERT(h->mh_act != NULL);
378         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
379         LASSERT(current->journal_info == NULL);
380
381         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
382
383         if (h->mh_fail_id != 0)
384                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
385
386         off = MDS_REQ_REC_OFF + shift;
387         result = 0;
388         if (h->mh_flags & HABEO_CORPUS) {
389                 info->mti_body = lustre_swab_reqbuf(req, off,
390                                                     sizeof *info->mti_body,
391                                                     lustre_swab_mdt_body);
392                 if (info->mti_body == NULL) {
393                         CERROR("Can't unpack body\n");
394                         result = req->rq_status = -EFAULT;
395                 }
396                 info->mti_object = mdt_object_find(info->mti_mdt,
397                                                    &info->mti_body->fid1);
398                 if (IS_ERR(info->mti_object))
399                         result = PTR_ERR(info->mti_object);
400         }
401         if (result == 0)
402                 /*
403                  * Process request.
404                  */
405                 result = h->mh_act(info, req, off);
406         /*
407          * XXX result value is unconditionally shoved into ->rq_status
408          * (original code sometimes placed error code into ->rq_status, and
409          * sometimes returned it to the
410          * caller). ptlrpc_server_handle_request() doesn't check return value
411          * anyway.
412          */
413         req->rq_status = result;
414
415         LASSERT(current->journal_info == NULL);
416
417         /* If we're DISCONNECTing, the mds_export_data is already freed */
418         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
419                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
420                 target_committed_to_req(req);
421         }
422         RETURN(result);
423 }
424
425 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
426 {
427         lh->mlh_lh.cookie = 0ull;
428         lh->mlh_mode = LCK_MINMODE;
429 }
430
431 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
432 {
433         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
434 }
435
436 static void mdt_thread_info_init(struct mdt_thread_info *info)
437 {
438         int i;
439
440         memset(info, 0, sizeof *info);
441         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
442         /*
443          * Poison size array.
444          */
445         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
446                 info->mti_rep_buf_size[i] = ~0;
447         info->mti_rep_buf_nr = i;
448         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
449                 mdt_lock_handle_init(&info->mti_lh[i]);
450 }
451
452 static void mdt_thread_info_fini(struct mdt_thread_info *info)
453 {
454         int i;
455
456         if (info->mti_object != NULL) {
457                 mdt_object_put(info->mti_object);
458                 info->mti_object = NULL;
459         }
460         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
461                 mdt_lock_handle_fini(&info->mti_lh[i]);
462 }
463
464 static int mds_msg_check_version(struct lustre_msg *msg)
465 {
466         int rc;
467
468         /* TODO: enable the below check while really introducing msg version.
469          * it's disabled because it will break compatibility with b1_4.
470          */
471         return (0);
472
473         switch (msg->opc) {
474         case MDS_CONNECT:
475         case MDS_DISCONNECT:
476         case OBD_PING:
477                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
478                 if (rc)
479                         CERROR("bad opc %u version %08x, expecting %08x\n",
480                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
481                 break;
482         case MDS_GETSTATUS:
483         case MDS_GETATTR:
484         case MDS_GETATTR_NAME:
485         case MDS_STATFS:
486         case MDS_READPAGE:
487         case MDS_REINT:
488         case MDS_CLOSE:
489         case MDS_DONE_WRITING:
490         case MDS_PIN:
491         case MDS_SYNC:
492         case MDS_GETXATTR:
493         case MDS_SETXATTR:
494         case MDS_SET_INFO:
495         case MDS_QUOTACHECK:
496         case MDS_QUOTACTL:
497         case QUOTA_DQACQ:
498         case QUOTA_DQREL:
499                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
500                 if (rc)
501                         CERROR("bad opc %u version %08x, expecting %08x\n",
502                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
503                 break;
504         case LDLM_ENQUEUE:
505         case LDLM_CONVERT:
506         case LDLM_BL_CALLBACK:
507         case LDLM_CP_CALLBACK:
508                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
509                 if (rc)
510                         CERROR("bad opc %u version %08x, expecting %08x\n",
511                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
512                 break;
513         case OBD_LOG_CANCEL:
514         case LLOG_ORIGIN_HANDLE_CREATE:
515         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
516         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
517         case LLOG_ORIGIN_HANDLE_READ_HEADER:
518         case LLOG_ORIGIN_HANDLE_CLOSE:
519         case LLOG_CATINFO:
520                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
521                 if (rc)
522                         CERROR("bad opc %u version %08x, expecting %08x\n",
523                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
524                 break;
525         default:
526                 CERROR("MDS unknown opcode %d\n", msg->opc);
527                 rc = -ENOTSUPP;
528         }
529         return rc;
530 }
531
532 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
533                                        struct obd_device *obd, int *process)
534 {
535         switch (req->rq_reqmsg->opc) {
536         case MDS_CONNECT: /* This will never get here, but for completeness. */
537         case OST_CONNECT: /* This will never get here, but for completeness. */
538         case MDS_DISCONNECT:
539         case OST_DISCONNECT:
540                *process = 1;
541                RETURN(0);
542
543         case MDS_CLOSE:
544         case MDS_SYNC: /* used in unmounting */
545         case OBD_PING:
546         case MDS_REINT:
547         case LDLM_ENQUEUE:
548                 *process = target_queue_recovery_request(req, obd);
549                 RETURN(0);
550
551         default:
552                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
553                 *process = 0;
554                 /* XXX what should we set rq_status to here? */
555                 req->rq_status = -EAGAIN;
556                 RETURN(ptlrpc_error(req));
557         }
558 }
559
560 /*
561  * Handle recovery. Return:
562  *       +ve: continue request processing;
563  *       -ve: abort immediately with given (negated) error code;
564  *         0: send reply with error code in req->rq_status;
565  */
566 static int mdt_recovery(struct ptlrpc_request *req)
567 {
568         int recovering;
569         int abort_recovery;
570         struct obd_device *obd;
571
572         ENTRY;
573
574         if (req->rq_reqmsg->opc == MDS_CONNECT)
575                 RETURN(+1);
576
577         if (req->rq_export == NULL) {
578                 CERROR("operation %d on unconnected MDS from %s\n",
579                        req->rq_reqmsg->opc,
580                        libcfs_id2str(req->rq_peer));
581                 req->rq_status = -ENOTCONN;
582                 RETURN(0);
583         }
584
585         /* sanity check: if the xid matches, the request must be marked as a
586          * resent or replayed */
587         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
588                       lustre_msg_get_flags(req->rq_reqmsg) &
589                       (MSG_RESENT | MSG_REPLAY)),
590                  "rq_xid "LPU64" matches last_xid, "
591                  "expected RESENT flag\n", req->rq_xid);
592
593         /* else: note the opposite is not always true; a RESENT req after a
594          * failover will usually not match the last_xid, since it was likely
595          * never committed. A REPLAYed request will almost never match the
596          * last xid, however it could for a committed, but still retained,
597          * open. */
598
599         obd = req->rq_export->exp_obd;
600
601         /* Check for aborted recovery... */
602         spin_lock_bh(&obd->obd_processing_task_lock);
603         abort_recovery = obd->obd_abort_recovery;
604         recovering = obd->obd_recovering;
605         spin_unlock_bh(&obd->obd_processing_task_lock);
606         if (abort_recovery) {
607                 target_abort_recovery(obd);
608         } else if (recovering) {
609                 int rc;
610                 int should_process;
611
612                 rc = mdt_filter_recovery_request(req, obd, &should_process);
613                 if (rc != 0 || !should_process) {
614                         LASSERT(rc < 0);
615                         RETURN(rc);
616                 }
617         }
618         RETURN(+1);
619 }
620
621 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
622 {
623         struct obd_device *obd;
624
625         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
626                 if (req->rq_reqmsg->opc != OBD_PING)
627                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
628
629                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
630                 if (obd && obd->obd_recovering) {
631                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
632                         RETURN(target_queue_final_reply(req, req->rq_status));
633                 } else {
634                         /* Lost a race with recovery; let the error path
635                          * DTRT. */
636                         req->rq_status = -ENOTCONN;
637                 }
638         }
639         target_send_reply(req, req->rq_status, info->mti_fail_id);
640         RETURN(req->rq_status);
641 }
642
643 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
644 {
645         struct lustre_msg *msg;
646         int                result;
647
648         ENTRY;
649
650         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
651
652         LASSERT(current->journal_info == NULL);
653
654         msg = req->rq_reqmsg;
655         result = mds_msg_check_version(msg);
656         if (result != 0) {
657                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
658                 RETURN(result);
659         }
660
661         result = mdt_recovery(req);
662         if (result > 0) {
663                 struct mdt_handler *h;
664
665                 h = mdt_handler_find(msg->opc);
666                 if (h != NULL)
667                         result = mdt_req_handle(info, h, req, 0);
668                 else {
669                         req->rq_status = -ENOTSUPP;
670                         result = ptlrpc_error(req);
671                         RETURN(result);
672                 }
673         } else if (result < 0)
674                 RETURN(result);
675
676         RETURN(mdt_reply(req, info));
677 }
678
679 static struct mdt_device *mdt_dev(struct lu_device *d)
680 {
681         LASSERT(lu_device_is_mdt(d));
682         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
683 }
684
685 static int mdt_handle(struct ptlrpc_request *req)
686 {
687         int result;
688
689         struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
690                                                              &mdt_thread_key);
691         mdt_thread_info_init(info);
692         info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
693
694         result = mdt_handle0(req, info);
695
696         mdt_thread_info_fini(info);
697         return result;
698 }
699
700 static int mdt_intent_policy(struct ldlm_namespace *ns,
701                              struct ldlm_lock **lockp, void *req_cookie,
702                              ldlm_mode_t mode, int flags, void *data)
703 {
704         ENTRY;
705         RETURN(ELDLM_LOCK_ABORTED);
706 }
707
708 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
709                                             svc_handler_t h, char *name,
710                                             struct proc_dir_entry *proc_entry,
711                                             svcreq_printfn_t prntfn)
712 {
713         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
714                                c->psc_max_req_size, c->psc_max_reply_size,
715                                c->psc_req_portal, c->psc_rep_portal,
716                                c->psc_watchdog_timeout,
717                                h, name, proc_entry,
718                                prntfn, c->psc_num_threads);
719 }
720
721 int md_device_init(struct md_device *md, struct lu_device_type *t)
722 {
723         return lu_device_init(&md->md_lu_dev, t);
724 }
725
726 void md_device_fini(struct md_device *md)
727 {
728         lu_device_fini(&md->md_lu_dev);
729 }
730
731 static void mdt_fini(struct lu_device *d)
732 {
733         struct mdt_device *m = mdt_dev(d);
734
735         if (d->ld_site != NULL) {
736                 lu_site_fini(d->ld_site);
737                 d->ld_site = NULL;
738         }
739         if (m->mdt_service != NULL) {
740                 ptlrpc_unregister_service(m->mdt_service);
741                 m->mdt_service = NULL;
742         }
743         if (m->mdt_namespace != NULL) {
744                 ldlm_namespace_free(m->mdt_namespace, 0);
745                 m->mdt_namespace = NULL;
746         }
747
748         LASSERT(atomic_read(&d->ld_ref) == 0);
749         md_device_fini(&m->mdt_md_dev);
750 }
751
752 static int mdt_init0(struct mdt_device *m,
753                      struct lu_device_type *t, struct lustre_cfg *cfg)
754 {
755         struct lu_site *s;
756         char   ns_name[48];
757
758         ENTRY;
759
760         OBD_ALLOC_PTR(s);
761         if (s == NULL)
762                 return -ENOMEM;
763
764         md_device_init(&m->mdt_md_dev, t);
765
766         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
767
768         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
769         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
770         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
771         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
772         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
773         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
774         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
775         /*
776          * We'd like to have a mechanism to set this on a per-device basis,
777          * but alas...
778          */
779         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
780                                                       MDT_MIN_THREADS),
781                                                   MDT_MAX_THREADS);
782         lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
783
784         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
785         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
786         if (m->mdt_namespace == NULL)
787                 return -ENOMEM;
788         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
789
790         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
791                            "mdt_ldlm_client", &m->mdt_ldlm_client);
792
793         m->mdt_service =
794                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
795                                      LUSTRE_MDT0_NAME,
796                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
797                                      NULL);
798         if (m->mdt_service == NULL)
799                 return -ENOMEM;
800
801         return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
802 }
803
804 struct lu_object *mdt_object_alloc(struct lu_device *d)
805 {
806         struct mdt_object *mo;
807
808         OBD_ALLOC_PTR(mo);
809         if (mo != NULL) {
810                 struct lu_object *o;
811                 struct lu_object_header *h;
812
813                 o = &mo->mot_obj.mo_lu;
814                 h = &mo->mot_header;
815                 lu_object_header_init(h);
816                 lu_object_init(o, h, d);
817                 /* ->lo_depth and ->lo_flags are automatically 0 */
818                 lu_object_add_top(h, o);
819                 return o;
820         } else
821                 return NULL;
822 }
823
824 int mdt_object_init(struct lu_object *o)
825 {
826         struct mdt_device *d = mdt_dev(o->lo_dev);
827         struct lu_device  *under;
828         struct lu_object  *below;
829
830         under = &d->mdt_child->md_lu_dev;
831         below = under->ld_ops->ldo_object_alloc(under);
832         if (below != NULL) {
833                 lu_object_add(o, below);
834                 return 0;
835         } else
836                 return -ENOMEM;
837 }
838
839 void mdt_object_free(struct lu_object *o)
840 {
841         struct lu_object_header *h;
842
843         h = o->lo_header;
844         lu_object_fini(o);
845         lu_object_header_fini(h);
846 }
847
848 void mdt_object_release(struct lu_object *o)
849 {
850 }
851
852 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
853 {
854         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
855 }
856
857 static struct lu_device_operations mdt_lu_ops = {
858         .ldo_object_alloc   = mdt_object_alloc,
859         .ldo_object_init    = mdt_object_init,
860         .ldo_object_free    = mdt_object_free,
861         .ldo_object_release = mdt_object_release,
862         .ldo_object_print   = mdt_object_print
863 };
864
865 struct md_object *mdt_object_child(struct mdt_object *o)
866 {
867         return lu2md(lu_object_next(&o->mot_obj.mo_lu));
868 }
869
870 static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
871 {
872         return d->mdt_child->md_ops;
873 }
874
875 int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
876               struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
877 {
878         struct mdt_object      *o;
879         struct mdt_object      *child;
880         struct mdt_lock_handle *lh;
881
882         int result;
883
884         lh = &info->mti_lh[MDT_LH_PARENT];
885         lh->mlh_mode = LCK_PW;
886
887         o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
888         if (IS_ERR(o))
889                 return PTR_ERR(o);
890
891         child = mdt_object_find(d, cfid);
892         if (!IS_ERR(child)) {
893                 result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
894                                                      mdt_object_child(child));
895                 mdt_object_put(child);
896         } else
897                 result = PTR_ERR(child);
898         mdt_object_unlock(d->mdt_namespace, o, lh);
899         mdt_object_put(o);
900         return result;
901 }
902
903 static struct obd_ops mdt_obd_device_ops = {
904         .o_owner = THIS_MODULE
905 };
906
907 struct lu_device *mdt_device_alloc(struct lu_device_type *t,
908                                    struct lustre_cfg *cfg)
909 {
910         struct lu_device  *l;
911         struct mdt_device *m;
912
913         OBD_ALLOC_PTR(m);
914         if (m != NULL) {
915                 int result;
916
917                 l = &m->mdt_md_dev.md_lu_dev;
918                 result = mdt_init0(m, t, cfg);
919                 if (result != 0) {
920                         mdt_fini(l);
921                         m = ERR_PTR(result);
922                 }
923         } else
924                 l = ERR_PTR(-ENOMEM);
925         return l;
926 }
927
928 void mdt_device_free(struct lu_device *m)
929 {
930         mdt_fini(m);
931         OBD_FREE_PTR(m);
932 }
933
934 void *mdt_thread_init(struct ptlrpc_thread *t)
935 {
936         struct mdt_thread_info *info;
937
938         return OBD_ALLOC_PTR(info) ? : ERR_PTR(-ENOMEM);
939 }
940
941 void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
942 {
943         struct mdt_thread_info *info = data;
944         OBD_FREE_PTR(info);
945 }
946
947 static struct ptlrpc_thread_key mdt_thread_key = {
948         .ptk_init = mdt_thread_init,
949         .ptk_fini = mdt_thread_fini
950 };
951
952 int mdt_type_init(struct lu_device_type *t)
953 {
954         return ptlrpc_thread_key_register(&mdt_thread_key);
955 }
956
957 void mdt_type_fini(struct lu_device_type *t)
958 {
959 }
960
961 static struct lu_device_type_operations mdt_device_type_ops = {
962         .ldto_init = mdt_type_init,
963         .ldto_fini = mdt_type_fini,
964
965         .ldto_device_alloc = mdt_device_alloc,
966         .ldto_device_free  = mdt_device_free
967 };
968
969 static struct lu_device_type mdt_device_type = {
970         .ldt_tags = LU_DEVICE_MD,
971         .ldt_name = LUSTRE_MDT0_NAME,
972         .ldt_ops  = &mdt_device_type_ops
973 };
974
975 struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
976         { 0 }
977 };
978
979 struct lprocfs_vars lprocfs_mdt_module_vars[] = {
980         { 0 }
981 };
982
983 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
984
985 static int __init mdt_mod_init(void)
986 {
987         struct lprocfs_static_vars lvars;
988         struct obd_type *type;
989         int result;
990
991         mdt_num_threads = MDT_NUM_THREADS;
992         lprocfs_init_vars(mdt, &lvars);
993         result = class_register_type(&mdt_obd_device_ops,
994                                      lvars.module_vars, LUSTRE_MDT0_NAME);
995         if (result == 0) {
996                 type = class_get_type(LUSTRE_MDT0_NAME);
997                 LASSERT(type != NULL);
998                 type->typ_lu = &mdt_device_type;
999                 result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
1000                 if (result != 0)
1001                         class_unregister_type(LUSTRE_MDT0_NAME);
1002         }
1003         return result;
1004 }
1005
1006 static void __exit mdt_mod_exit(void)
1007 {
1008         class_unregister_type(LUSTRE_MDT0_NAME);
1009 }
1010
1011
1012 #define DEF_HNDL(prefix, base, flags, opc, fn)                        \
1013 [prefix ## _ ## opc - prefix ## _ ## base] = {                        \
1014         .mh_name    = #opc,                                        \
1015         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET,        \
1016         .mh_opc     = prefix ## _  ## opc,                        \
1017         .mh_flags   = flags,                                        \
1018         .mh_act     = fn                                        \
1019 }
1020
1021 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
1022
1023 static struct mdt_handler mdt_mds_ops[] = {
1024         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1025         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1026         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1027         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1028         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1029         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1030         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1031         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1032         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1033         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1034         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1035         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1036         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1037         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1038         DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
1039         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1040         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl),
1041 };
1042
1043 static struct mdt_handler mdt_obd_ops[] = {
1044 };
1045
1046 static struct mdt_handler mdt_dlm_ops[] = {
1047 };
1048
1049 static struct mdt_handler mdt_llog_ops[] = {
1050 };
1051
1052 static struct mdt_opc_slice mdt_handlers[] = {
1053         {
1054                 .mos_opc_start = MDS_GETATTR,
1055                 .mos_opc_end   = MDS_LAST_OPC,
1056                 .mos_hs        = mdt_mds_ops
1057         },
1058         {
1059                 .mos_opc_start = OBD_PING,
1060                 .mos_opc_end   = OBD_LAST_OPC,
1061                 .mos_hs        = mdt_obd_ops
1062         },
1063         {
1064                 .mos_opc_start = LDLM_ENQUEUE,
1065                 .mos_opc_end   = LDLM_LAST_OPC,
1066                 .mos_hs        = mdt_dlm_ops
1067         },
1068         {
1069                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1070                 .mos_opc_end   = LLOG_LAST_OPC,
1071                 .mos_hs        = mdt_llog_ops
1072         },
1073         {
1074                 .mos_hs        = NULL
1075         }
1076 };
1077
1078 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1079 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1080 MODULE_LICENSE("GPL");
1081
1082 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1083                 "number of mdt service threads to start");
1084
1085 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);