Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
84
85         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 exp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, exp, localdata);
94
95         if (rc) {
96                 class_disconnect(exp);
97         } else {
98                 class_export_put(exp);
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data,
107                          void *localdata)
108 {
109         ENTRY;
110
111         if (exp == NULL || obd == NULL || cluuid == NULL)
112                 RETURN(-EINVAL);
113
114         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
115
116         if (data != NULL) {
117                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
118                 exp->exp_connect_flags = data->ocd_connect_flags;
119                 data->ocd_version = LUSTRE_VERSION_CODE;
120         }
121
122         RETURN(0);
123 }
124
125 static int mgs_disconnect(struct obd_export *exp)
126 {
127         int rc;
128         ENTRY;
129
130         LASSERT(exp);
131
132         class_export_get(exp);
133         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
134
135         /* Disconnect early so that clients can't keep using export */
136         rc = class_disconnect(exp);
137         ldlm_cancel_locks_for_export(exp);
138
139         lprocfs_exp_cleanup(exp);
140
141         /* complete all outstanding replies */
142         spin_lock(&exp->exp_lock);
143         while (!list_empty(&exp->exp_outstanding_replies)) {
144                 struct ptlrpc_reply_state *rs =
145                         list_entry(exp->exp_outstanding_replies.next,
146                                    struct ptlrpc_reply_state, rs_exp_list);
147                 struct ptlrpc_service *svc = rs->rs_service;
148
149                 spin_lock(&svc->srv_lock);
150                 list_del_init(&rs->rs_exp_list);
151                 ptlrpc_schedule_difficult_reply(rs);
152                 spin_unlock(&svc->srv_lock);
153         }
154         spin_unlock(&exp->exp_lock);
155
156         class_export_put(exp);
157         RETURN(rc);
158 }
159
160 static int mgs_cleanup(struct obd_device *obd);
161 static int mgs_handle(struct ptlrpc_request *req);
162
163 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
164                          struct obd_device *tgt, int count,
165                          struct llog_catid *logid, struct obd_uuid *uuid)
166 {
167         int rc;
168         ENTRY;
169
170         LASSERT(olg == &obd->obd_olg);
171         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
172                         &llog_lvfs_ops);
173         RETURN(rc);
174 }
175
176 static int mgs_llog_finish(struct obd_device *obd, int count)
177 {
178         struct llog_ctxt *ctxt;
179         int rc = 0;
180         ENTRY;
181
182         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
183         if (ctxt)
184                 rc = llog_cleanup(ctxt);
185
186         RETURN(rc);
187 }
188
189 /* Start the MGS obd */
190 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
191 {
192         struct lprocfs_static_vars lvars;
193         struct mgs_obd *mgs = &obd->u.mgs;
194         struct lustre_mount_info *lmi;
195         struct lustre_sb_info *lsi;
196         struct vfsmount *mnt;
197         int rc = 0;
198         ENTRY;
199
200         CDEBUG(D_CONFIG, "Starting MGS\n");
201
202         /* Find our disk */
203         lmi = server_get_mount(obd->obd_name);
204         if (!lmi)
205                 RETURN(rc = -EINVAL);
206
207         mnt = lmi->lmi_mnt;
208         lsi = s2lsi(lmi->lmi_sb);
209         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
210         if (IS_ERR(obd->obd_fsops))
211                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
212
213         /* namespace for mgs llog */
214         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
215                                                 LDLM_NAMESPACE_MODEST);
216         if (obd->obd_namespace == NULL)
217                 GOTO(err_ops, rc = -ENOMEM);
218
219         /* ldlm setup */
220         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
221                            "mgs_ldlm_client", &obd->obd_ldlm_client);
222
223         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
224
225         rc = mgs_fs_setup(obd, mnt);
226         if (rc) {
227                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
228                        obd->obd_name, rc);
229                 GOTO(err_ns, rc);
230         }
231
232         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
233         if (rc)
234                 GOTO(err_fs, rc);
235
236         /* No recovery for MGC's */
237         obd->obd_replayable = 0;
238
239         /* Internal mgs setup */
240         mgs_init_fsdb_list(obd);
241         sema_init(&mgs->mgs_sem, 1);
242
243         /* Start the service threads */
244         mgs->mgs_service =
245                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
246                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
247                                 MGC_REPLY_PORTAL, 2000,
248                                 mgs_handle, LUSTRE_MGS_NAME,
249                                 obd->obd_proc_entry, target_print_req,
250                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
251                                 "ll_mgs", LCT_MD_THREAD);
252
253         if (!mgs->mgs_service) {
254                 CERROR("failed to start service\n");
255                 GOTO(err_llog, rc = -ENOMEM);
256         }
257
258         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
259         if (rc)
260                 GOTO(err_thread, rc);
261
262         /* Setup proc */
263         lprocfs_mgs_init_vars(&lvars);
264         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
265                 lproc_mgs_setup(obd);
266         }
267
268         ping_evictor_start();
269
270         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
271
272         RETURN(0);
273
274 err_thread:
275         ptlrpc_unregister_service(mgs->mgs_service);
276 err_llog:
277         obd_llog_finish(obd, 0);
278 err_fs:
279         /* No extra cleanup needed for llog_init_commit_thread() */
280         mgs_fs_cleanup(obd);
281 err_ns:
282         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
283         obd->obd_namespace = NULL;
284 err_ops:
285         fsfilt_put_ops(obd->obd_fsops);
286 err_put:
287         server_put_mount(obd->obd_name, mnt);
288         mgs->mgs_sb = 0;
289         return rc;
290 }
291
292 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
293 {
294         int rc = 0;
295         ENTRY;
296
297         switch (stage) {
298         case OBD_CLEANUP_EARLY:
299         case OBD_CLEANUP_EXPORTS:
300                 rc = obd_llog_finish(obd, 0);
301                 break;
302         }
303         RETURN(rc);
304 }
305
306 /**
307  * Performs cleanup procedures for passed \a obd given it is mgs obd.
308  */
309 static int mgs_cleanup(struct obd_device *obd)
310 {
311         struct mgs_obd *mgs = &obd->u.mgs;
312         ENTRY;
313
314         if (mgs->mgs_sb == NULL)
315                 RETURN(0);
316
317         ping_evictor_stop();
318
319         ptlrpc_unregister_service(mgs->mgs_service);
320
321         mgs_cleanup_fsdb_list(obd);
322         lproc_mgs_cleanup(obd);
323         mgs_fs_cleanup(obd);
324
325         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
326         mgs->mgs_sb = NULL;
327
328         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
329         obd->obd_namespace = NULL;
330
331         fsfilt_put_ops(obd->obd_fsops);
332
333         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
334         RETURN(0);
335 }
336
337 /* similar to filter_prepare_destroy */
338 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
339                             struct lustre_handle *lockh)
340 {
341         struct ldlm_res_id res_id;
342         int rc, flags = 0;
343         ENTRY;
344
345         rc = mgc_fsname2resid(fsname, &res_id);
346         if (!rc)
347                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
348                                             LDLM_PLAIN, NULL, LCK_EX,
349                                             &flags, ldlm_blocking_ast,
350                                             ldlm_completion_ast, NULL,
351                                             fsname, 0, NULL, lockh);
352         if (rc)
353                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
354
355         RETURN(rc);
356 }
357
358 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
359 {
360         ENTRY;
361         ldlm_lock_decref(lockh, LCK_EX);
362         RETURN(0);
363 }
364
365 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
366                             struct lustre_handle *lockh)
367 {
368         int lockrc;
369
370         if (fsname[0]) {
371                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
372                 if (lockrc != ELDLM_OK)
373                         CERROR("lock error %d for fs %s\n", lockrc,
374                                fsname);
375                 else
376                         mgs_put_cfg_lock(lockh);
377         }
378 }
379
380 /* rc=0 means ok
381       1 means update
382      <0 means error */
383 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
384 {
385         int rc;
386         ENTRY;
387
388         rc = mgs_check_index(obd, mti);
389         if (rc == 0) {
390                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
391                                    "this MGS does not know about it, preventing "
392                                    "registration.\n", mti->mti_svname);
393                 rc = -ENOENT;
394         } else if (rc == -1) {
395                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
396                                    "disappeared! Regenerating all logs.\n",
397                                    mti->mti_fsname);
398                 mti->mti_flags |= LDD_F_WRITECONF;
399                 rc = 1;
400         } else {
401                 /* Index is correctly marked as used */
402
403                 /* If the logs don't contain the mti_nids then add
404                    them as failover nids */
405                 rc = mgs_check_failnid(obd, mti);
406         }
407
408         RETURN(rc);
409 }
410
411 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
412 static int mgs_handle_target_reg(struct ptlrpc_request *req)
413 {
414         struct obd_device *obd = req->rq_export->exp_obd;
415         struct lustre_handle lockh;
416         struct mgs_target_info *mti, *rep_mti;
417         int rc = 0, lockrc;
418         ENTRY;
419
420         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
421
422         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
423         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
424                                 LDD_F_UPDATE))) {
425                 /* We're just here as a startup ping. */
426                 CDEBUG(D_MGS, "Server %s is running on %s\n",
427                        mti->mti_svname, obd_export_nid2str(req->rq_export));
428                 rc = mgs_check_target(obd, mti);
429                 /* above will set appropriate mti flags */
430                 if (rc <= 0)
431                         /* Nothing wrong, or fatal error */
432                         GOTO(out_nolock, rc);
433         }
434
435         /* Revoke the config lock to make sure nobody is reading. */
436         /* Although actually I think it should be alright if
437            someone was reading while we were updating the logs - if we
438            revoke at the end they will just update from where they left off. */
439         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
440         if (lockrc != ELDLM_OK) {
441                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
442                                    "update their configuration (%d). Updating "
443                                    "local logs anyhow; you might have to "
444                                    "manually restart other nodes to get the "
445                                    "latest configuration.\n",
446                                    obd->obd_name, lockrc);
447         }
448
449         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
450
451         /* Log writing contention is handled by the fsdb_sem */
452
453         if (mti->mti_flags & LDD_F_WRITECONF) {
454                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
455                     mti->mti_stripe_index == 0) {
456                         rc = mgs_erase_logs(obd, mti->mti_fsname);
457                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
458                                       "request.  All servers must be restarted "
459                                       "in order to regenerate the logs."
460                                       "\n", obd->obd_name, mti->mti_fsname);
461                 } else if (mti->mti_flags &
462                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
463                         rc = mgs_erase_log(obd, mti->mti_svname);
464                         LCONSOLE_WARN("%s: Regenerating %s log by user "
465                                       "request.\n",
466                                       obd->obd_name, mti->mti_svname);
467                 }
468                 mti->mti_flags |= LDD_F_UPDATE;
469                 /* Erased logs means start from scratch. */
470                 mti->mti_flags &= ~LDD_F_UPGRADE14;
471         }
472
473         /* COMPAT_146 */
474         if (mti->mti_flags & LDD_F_UPGRADE14) {
475                 rc = mgs_upgrade_sv_14(obd, mti);
476                 if (rc) {
477                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
478                         GOTO(out, rc);
479                 }
480
481                 /* We're good to go */
482                 mti->mti_flags |= LDD_F_UPDATE;
483         }
484         /* end COMPAT_146 */
485
486         if (mti->mti_flags & LDD_F_UPDATE) {
487                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
488                        mti->mti_stripe_index);
489
490                 /* create or update the target log
491                    and update the client/mdt logs */
492                 rc = mgs_write_log_target(obd, mti);
493                 if (rc) {
494                         CERROR("Failed to write %s log (%d)\n",
495                                mti->mti_svname, rc);
496                         GOTO(out, rc);
497                 }
498
499                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
500                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
501                                     LDD_F_UPGRADE14);
502                 mti->mti_flags |= LDD_F_REWRITE_LDD;
503         }
504
505 out:
506         /* done with log update */
507         if (lockrc == ELDLM_OK)
508                 mgs_put_cfg_lock(&lockh);
509 out_nolock:
510         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
511                mti->mti_stripe_index, rc);
512         rc = req_capsule_server_pack(&req->rq_pill);
513         if (rc)
514                 RETURN(rc);
515
516         /* send back the whole mti in the reply */
517         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
518         *rep_mti = *mti;
519
520         /* Flush logs to disk */
521         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
522         RETURN(rc);
523 }
524
525 static int mgs_set_info_rpc(struct ptlrpc_request *req)
526 {
527         struct obd_device *obd = req->rq_export->exp_obd;
528         struct mgs_send_param *msp, *rep_msp;
529         struct lustre_handle lockh;
530         int rc;
531         struct lustre_cfg_bufs bufs;
532         struct lustre_cfg *lcfg;
533         char fsname[MTI_NAME_MAXLEN];
534         ENTRY;
535
536         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
537         LASSERT(msp);
538
539         /* Construct lustre_cfg structure to pass to function mgs_setparam */
540         lustre_cfg_bufs_reset(&bufs, NULL);
541         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
542         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
543         rc = mgs_setparam(obd, lcfg, fsname);
544         if (rc) {
545                 CERROR("Error %d in setting the parameter %s for fs %s\n",
546                        rc, msp->mgs_param, fsname);
547                 RETURN(rc);
548         }
549
550         /* request for update */
551         mgs_revoke_lock(obd, fsname, &lockh);
552
553         lustre_cfg_free(lcfg);
554
555         rc = req_capsule_server_pack(&req->rq_pill);
556         if (rc == 0) {
557                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
558                 rep_msp = msp;
559         }
560         RETURN(rc);
561 }
562
563 /* Called whenever a target cleans up. */
564 /* XXX - Currently unused */
565 static int mgs_handle_target_del(struct ptlrpc_request *req)
566 {
567         ENTRY;
568         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
569         RETURN(0);
570 }
571
572 /* XXX - Currently unused */
573 static int mgs_handle_exception(struct ptlrpc_request *req)
574 {
575         ENTRY;
576         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
577         RETURN(0);
578 }
579
580 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
581 int mgs_handle(struct ptlrpc_request *req)
582 {
583         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
584         int opc, rc = 0;
585         ENTRY;
586
587         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
588         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
589         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
590                 RETURN(0);
591
592         LASSERT(current->journal_info == NULL);
593         opc = lustre_msg_get_opc(req->rq_reqmsg);
594         if (opc != MGS_CONNECT) {
595                 if (req->rq_export == NULL) {
596                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
597                                opc);
598                         req->rq_status = -ENOTCONN;
599                         GOTO(out, rc = -ENOTCONN);
600                 }
601         }
602
603         switch (opc) {
604         case MGS_CONNECT:
605                 DEBUG_REQ(D_MGS, req, "connect");
606                 /* MGS and MDS have same request format for connect */
607                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
608                 rc = target_handle_connect(req);
609                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
610                         /* Make clients trying to reconnect after a MGS restart
611                            happy; also requires obd_replayable */
612                         lustre_msg_add_op_flags(req->rq_repmsg,
613                                                 MSG_CONNECT_RECONNECT);
614                 break;
615         case MGS_DISCONNECT:
616                 DEBUG_REQ(D_MGS, req, "disconnect");
617                 /* MGS and MDS have same request format for disconnect */
618                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
619                 rc = target_handle_disconnect(req);
620                 req->rq_status = rc;            /* superfluous? */
621                 break;
622         case MGS_EXCEPTION:
623                 DEBUG_REQ(D_MGS, req, "exception");
624                 rc = mgs_handle_exception(req);
625                 break;
626         case MGS_TARGET_REG:
627                 DEBUG_REQ(D_MGS, req, "target add");
628                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
629                 rc = mgs_handle_target_reg(req);
630                 break;
631         case MGS_TARGET_DEL:
632                 DEBUG_REQ(D_MGS, req, "target del");
633                 rc = mgs_handle_target_del(req);
634                 break;
635         case MGS_SET_INFO:
636                 DEBUG_REQ(D_MGS, req, "set_info");
637                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
638                 rc = mgs_set_info_rpc(req);
639                 break;
640
641         case LDLM_ENQUEUE:
642                 DEBUG_REQ(D_MGS, req, "enqueue");
643                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
644                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
645                                          ldlm_server_blocking_ast, NULL);
646                 break;
647         case LDLM_BL_CALLBACK:
648         case LDLM_CP_CALLBACK:
649                 DEBUG_REQ(D_MGS, req, "callback");
650                 CERROR("callbacks should not happen on MGS\n");
651                 LBUG();
652                 break;
653
654         case OBD_PING:
655                 DEBUG_REQ(D_INFO, req, "ping");
656                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
657                 rc = target_handle_ping(req);
658                 break;
659         case OBD_LOG_CANCEL:
660                 DEBUG_REQ(D_MGS, req, "log cancel");
661                 rc = -ENOTSUPP; /* la la la */
662                 break;
663
664         case LLOG_ORIGIN_HANDLE_CREATE:
665                 DEBUG_REQ(D_MGS, req, "llog_init");
666                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
667                 rc = llog_origin_handle_create(req);
668                 break;
669         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
670                 DEBUG_REQ(D_MGS, req, "llog next block");
671                 req_capsule_set(&req->rq_pill,
672                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
673                 rc = llog_origin_handle_next_block(req);
674                 break;
675         case LLOG_ORIGIN_HANDLE_READ_HEADER:
676                 DEBUG_REQ(D_MGS, req, "llog read header");
677                 req_capsule_set(&req->rq_pill,
678                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
679                 rc = llog_origin_handle_read_header(req);
680                 break;
681         case LLOG_ORIGIN_HANDLE_CLOSE:
682                 DEBUG_REQ(D_MGS, req, "llog close");
683                 rc = llog_origin_handle_close(req);
684                 break;
685         case LLOG_CATINFO:
686                 DEBUG_REQ(D_MGS, req, "llog catinfo");
687                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
688                 rc = llog_catinfo(req);
689                 break;
690         default:
691                 req->rq_status = -ENOTSUPP;
692                 rc = ptlrpc_error(req);
693                 RETURN(rc);
694         }
695
696         LASSERT(current->journal_info == NULL);
697
698         if (rc)
699                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
700
701 out:
702         target_send_reply(req, rc, fail);
703         RETURN(0);
704 }
705
706 static inline int mgs_init_export(struct obd_export *exp)
707 {
708         return ldlm_init_export(exp);
709 }
710
711 static inline int mgs_destroy_export(struct obd_export *exp)
712 {
713         ENTRY;
714
715         target_destroy_export(exp);
716         mgs_client_free(exp);
717         ldlm_destroy_export(exp);
718
719         RETURN(0);
720 }
721
722 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
723 {
724         char *ptr;
725
726         ENTRY;
727         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
728                 *fsname = *ptr;
729                 fsname++;
730         }
731         if (*ptr == '\0')
732                 return -EINVAL;
733         *fsname = '\0';
734         ptr++;
735         strcpy(poolname, ptr);
736
737         RETURN(0);
738 }
739
740 static int mgs_iocontrol_pool(struct obd_device *obd,
741                               struct obd_ioctl_data *data)
742 {
743         int rc;
744         struct lustre_handle lockh;
745         struct lustre_cfg *lcfg = NULL;
746         struct llog_rec_hdr rec;
747         char *fsname = NULL;
748         char *poolname = NULL;
749         ENTRY;
750
751         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
752         if (fsname == NULL)
753                 RETURN(-ENOMEM);
754
755         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
756         if (poolname == NULL) {
757                 rc = -ENOMEM;
758                 GOTO(out_pool, rc);
759         }
760         rec.lrh_len = llog_data_len(data->ioc_plen1);
761
762         if (data->ioc_type == LUSTRE_CFG_TYPE) {
763                 rec.lrh_type = OBD_CFG_REC;
764         } else {
765                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
766                 rc = -EINVAL;
767                 GOTO(out_pool, rc);
768         }
769
770         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
771                 rc = -E2BIG;
772                 GOTO(out_pool, rc);
773         }
774
775         OBD_ALLOC(lcfg, data->ioc_plen1);
776         if (lcfg == NULL)
777                 GOTO(out_pool, rc = -ENOMEM);
778
779         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
780                 GOTO(out_pool, rc = -EFAULT);
781
782         if (lcfg->lcfg_bufcount < 2) {
783                 GOTO(out_pool, rc = -EFAULT);
784         }
785
786         /* first arg is always <fsname>.<poolname> */
787         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
788                             poolname);
789
790         switch (lcfg->lcfg_command) {
791         case LCFG_POOL_NEW: {
792                 if (lcfg->lcfg_bufcount != 2)
793                         RETURN(-EINVAL);
794                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
795                                   poolname, NULL);
796                 break;
797         }
798         case LCFG_POOL_ADD: {
799                 if (lcfg->lcfg_bufcount != 3)
800                         RETURN(-EINVAL);
801                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
802                                   lustre_cfg_string(lcfg, 2));
803                 break;
804         }
805         case LCFG_POOL_REM: {
806                 if (lcfg->lcfg_bufcount != 3)
807                         RETURN(-EINVAL);
808                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
809                                   lustre_cfg_string(lcfg, 2));
810                 break;
811         }
812         case LCFG_POOL_DEL: {
813                 if (lcfg->lcfg_bufcount != 2)
814                         RETURN(-EINVAL);
815                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
816                                   poolname, NULL);
817                 break;
818         }
819         default: {
820                  rc = -EINVAL;
821                  GOTO(out_pool, rc);
822         }
823         }
824
825         if (rc) {
826                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
827                        rc, lcfg->lcfg_command, fsname, poolname);
828                 GOTO(out_pool, rc);
829         }
830
831         /* request for update */
832         mgs_revoke_lock(obd, fsname, &lockh);
833
834 out_pool:
835         if (lcfg != NULL)
836                 OBD_FREE(lcfg, data->ioc_plen1);
837
838         if (fsname != NULL)
839                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
840
841         if (poolname != NULL)
842                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
843
844         RETURN(rc);
845 }
846
847 /* from mdt_iocontrol */
848 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
849                   void *karg, void *uarg)
850 {
851         struct obd_device *obd = exp->exp_obd;
852         struct obd_ioctl_data *data = karg;
853         struct lvfs_run_ctxt saved;
854         int rc = 0;
855
856         ENTRY;
857         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
858
859         switch (cmd) {
860
861         case OBD_IOC_PARAM: {
862                 struct lustre_handle lockh;
863                 struct lustre_cfg *lcfg;
864                 struct llog_rec_hdr rec;
865                 char fsname[MTI_NAME_MAXLEN];
866
867                 rec.lrh_len = llog_data_len(data->ioc_plen1);
868
869                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
870                         rec.lrh_type = OBD_CFG_REC;
871                 } else {
872                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
873                         RETURN(-EINVAL);
874                 }
875
876                 OBD_ALLOC(lcfg, data->ioc_plen1);
877                 if (lcfg == NULL)
878                         RETURN(-ENOMEM);
879                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
880                         GOTO(out_free, rc = -EFAULT);
881
882                 if (lcfg->lcfg_bufcount < 1)
883                         GOTO(out_free, rc = -EINVAL);
884
885                 rc = mgs_setparam(obd, lcfg, fsname);
886                 if (rc) {
887                         CERROR("setparam err %d\n", rc);
888                         GOTO(out_free, rc);
889                 }
890
891                 /* Revoke lock so everyone updates.  Should be alright if
892                    someone was already reading while we were updating the logs,
893                    so we don't really need to hold the lock while we're
894                    writing (above). */
895                 mgs_revoke_lock(obd, fsname, &lockh);
896
897 out_free:
898                 OBD_FREE(lcfg, data->ioc_plen1);
899                 RETURN(rc);
900         }
901
902         case OBD_IOC_POOL: {
903                 RETURN(mgs_iocontrol_pool(obd, data));
904         }
905
906         case OBD_IOC_DUMP_LOG: {
907                 struct llog_ctxt *ctxt;
908                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
909                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
910                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
911                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
912                 llog_ctxt_put(ctxt);
913
914                 RETURN(rc);
915         }
916
917         case OBD_IOC_LLOG_CHECK:
918         case OBD_IOC_LLOG_INFO:
919         case OBD_IOC_LLOG_PRINT: {
920                 struct llog_ctxt *ctxt;
921                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
922
923                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
924                 rc = llog_ioctl(ctxt, cmd, data);
925                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
926                 llog_ctxt_put(ctxt);
927
928                 RETURN(rc);
929         }
930
931         default:
932                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
933                 RETURN(-EINVAL);
934         }
935         RETURN(0);
936 }
937
938 /* use obd ops to offer management infrastructure */
939 static struct obd_ops mgs_obd_ops = {
940         .o_owner           = THIS_MODULE,
941         .o_connect         = mgs_connect,
942         .o_reconnect       = mgs_reconnect,
943         .o_disconnect      = mgs_disconnect,
944         .o_setup           = mgs_setup,
945         .o_precleanup      = mgs_precleanup,
946         .o_cleanup         = mgs_cleanup,
947         .o_init_export     = mgs_init_export,
948         .o_destroy_export  = mgs_destroy_export,
949         .o_iocontrol       = mgs_iocontrol,
950         .o_llog_init       = mgs_llog_init,
951         .o_llog_finish     = mgs_llog_finish
952 };
953
954 static int __init mgs_init(void)
955 {
956         struct lprocfs_static_vars lvars;
957
958         lprocfs_mgs_init_vars(&lvars);
959         class_register_type(&mgs_obd_ops, NULL,
960                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
961
962         return 0;
963 }
964
965 static void /*__exit*/ mgs_exit(void)
966 {
967         class_unregister_type(LUSTRE_MGS_NAME);
968 }
969
970 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
971 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
972 MODULE_LICENSE("GPL");
973
974 module_init(mgs_init);
975 module_exit(mgs_exit);