Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
84
85         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 exp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, exp, localdata);
94
95         if (rc) {
96                 class_disconnect(exp);
97         } else {
98                 class_export_put(exp);
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data,
107                          void *localdata)
108 {
109         ENTRY;
110
111         if (exp == NULL || obd == NULL || cluuid == NULL)
112                 RETURN(-EINVAL);
113
114         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
115
116         if (data != NULL) {
117                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
118                 exp->exp_connect_flags = data->ocd_connect_flags;
119                 data->ocd_version = LUSTRE_VERSION_CODE;
120         }
121
122         RETURN(0);
123 }
124
125 static int mgs_disconnect(struct obd_export *exp)
126 {
127         int rc;
128         ENTRY;
129
130         LASSERT(exp);
131
132         class_export_get(exp);
133         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
134
135         /* Disconnect early so that clients can't keep using export */
136         rc = class_disconnect(exp);
137         ldlm_cancel_locks_for_export(exp);
138
139         lprocfs_exp_cleanup(exp);
140
141         /* complete all outstanding replies */
142         spin_lock(&exp->exp_lock);
143         while (!list_empty(&exp->exp_outstanding_replies)) {
144                 struct ptlrpc_reply_state *rs =
145                         list_entry(exp->exp_outstanding_replies.next,
146                                    struct ptlrpc_reply_state, rs_exp_list);
147                 struct ptlrpc_service *svc = rs->rs_service;
148
149                 spin_lock(&svc->srv_lock);
150                 list_del_init(&rs->rs_exp_list);
151                 ptlrpc_schedule_difficult_reply(rs);
152                 spin_unlock(&svc->srv_lock);
153         }
154         spin_unlock(&exp->exp_lock);
155
156         class_export_put(exp);
157         RETURN(rc);
158 }
159
160 static int mgs_cleanup(struct obd_device *obd);
161 static int mgs_handle(struct ptlrpc_request *req);
162
163 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
164                          struct obd_device *tgt, int count,
165                          struct llog_catid *logid, struct obd_uuid *uuid)
166 {
167         int rc;
168         ENTRY;
169
170         LASSERT(olg == &obd->obd_olg);
171         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
172                         &llog_lvfs_ops);
173         RETURN(rc);
174 }
175
176 static int mgs_llog_finish(struct obd_device *obd, int count)
177 {
178         struct llog_ctxt *ctxt;
179         int rc = 0;
180         ENTRY;
181
182         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
183         if (ctxt)
184                 rc = llog_cleanup(ctxt);
185
186         RETURN(rc);
187 }
188
189 /* Start the MGS obd */
190 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
191 {
192         struct lprocfs_static_vars lvars;
193         struct mgs_obd *mgs = &obd->u.mgs;
194         struct lustre_mount_info *lmi;
195         struct lustre_sb_info *lsi;
196         struct vfsmount *mnt;
197         int rc = 0;
198         ENTRY;
199
200         CDEBUG(D_CONFIG, "Starting MGS\n");
201
202         /* Find our disk */
203         lmi = server_get_mount(obd->obd_name);
204         if (!lmi)
205                 RETURN(rc = -EINVAL);
206
207         mnt = lmi->lmi_mnt;
208         lsi = s2lsi(lmi->lmi_sb);
209         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
210         if (IS_ERR(obd->obd_fsops))
211                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
212
213         /* namespace for mgs llog */
214         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
215                                                 LDLM_NAMESPACE_MODEST);
216         if (obd->obd_namespace == NULL)
217                 GOTO(err_ops, rc = -ENOMEM);
218
219         /* ldlm setup */
220         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
221                            "mgs_ldlm_client", &obd->obd_ldlm_client);
222
223         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
224
225         rc = mgs_fs_setup(obd, mnt);
226         if (rc) {
227                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
228                        obd->obd_name, rc);
229                 GOTO(err_ns, rc);
230         }
231
232         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
233         if (rc)
234                 GOTO(err_fs, rc);
235
236         /* No recovery for MGC's */
237         obd->obd_replayable = 0;
238
239         /* Internal mgs setup */
240         mgs_init_fsdb_list(obd);
241         sema_init(&mgs->mgs_sem, 1);
242
243         /* Start the service threads */
244         mgs->mgs_service =
245                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
246                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
247                                 MGC_REPLY_PORTAL, 2000,
248                                 mgs_handle, LUSTRE_MGS_NAME,
249                                 obd->obd_proc_entry, target_print_req,
250                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
251                                 "ll_mgs", LCT_MD_THREAD);
252
253         if (!mgs->mgs_service) {
254                 CERROR("failed to start service\n");
255                 GOTO(err_llog, rc = -ENOMEM);
256         }
257
258         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
259         if (rc)
260                 GOTO(err_thread, rc);
261
262         /* Setup proc */
263         lprocfs_mgs_init_vars(&lvars);
264         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
265                 lproc_mgs_setup(obd);
266         }
267
268         ping_evictor_start();
269
270         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
271
272         RETURN(0);
273
274 err_thread:
275         ptlrpc_unregister_service(mgs->mgs_service);
276 err_llog:
277         obd_llog_finish(obd, 0);
278 err_fs:
279         /* No extra cleanup needed for llog_init_commit_thread() */
280         mgs_fs_cleanup(obd);
281 err_ns:
282         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
283         obd->obd_namespace = NULL;
284 err_ops:
285         fsfilt_put_ops(obd->obd_fsops);
286 err_put:
287         server_put_mount(obd->obd_name, mnt);
288         mgs->mgs_sb = 0;
289         return rc;
290 }
291
292 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
293 {
294         int rc = 0;
295         ENTRY;
296
297         switch (stage) {
298         case OBD_CLEANUP_EARLY:
299         case OBD_CLEANUP_EXPORTS:
300                 rc = obd_llog_finish(obd, 0);
301                 break;
302         }
303         RETURN(rc);
304 }
305
306 /**
307  * Performs cleanup procedures for passed \a obd given it is mgs obd.
308  */
309 static int mgs_cleanup(struct obd_device *obd)
310 {
311         struct mgs_obd *mgs = &obd->u.mgs;
312         ENTRY;
313
314         if (mgs->mgs_sb == NULL)
315                 RETURN(0);
316
317         ping_evictor_stop();
318
319         ptlrpc_unregister_service(mgs->mgs_service);
320
321         mgs_cleanup_fsdb_list(obd);
322         lproc_mgs_cleanup(obd);
323         mgs_fs_cleanup(obd);
324
325         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
326         mgs->mgs_sb = NULL;
327
328         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
329         obd->obd_namespace = NULL;
330
331         fsfilt_put_ops(obd->obd_fsops);
332
333         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
334         RETURN(0);
335 }
336
337 /* similar to filter_prepare_destroy */
338 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
339                             struct lustre_handle *lockh)
340 {
341         struct ldlm_res_id res_id;
342         int rc, flags = 0;
343         ENTRY;
344
345         rc = mgc_fsname2resid(fsname, &res_id);
346         if (!rc)
347                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
348                                             LDLM_PLAIN, NULL, LCK_EX,
349                                             &flags, ldlm_blocking_ast,
350                                             ldlm_completion_ast, NULL,
351                                             fsname, 0, NULL, lockh);
352         if (rc)
353                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
354
355         RETURN(rc);
356 }
357
358 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
359 {
360         ENTRY;
361         ldlm_lock_decref(lockh, LCK_EX);
362         RETURN(0);
363 }
364
365 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
366                             struct lustre_handle *lockh)
367 {
368         int lockrc;
369
370         if (fsname[0]) {
371                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
372                 if (lockrc != ELDLM_OK)
373                         CERROR("lock error %d for fs %s\n", lockrc,
374                                fsname);
375                 else
376                         mgs_put_cfg_lock(lockh);
377         }
378 }
379
380 /* rc=0 means ok
381       1 means update
382      <0 means error */
383 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
384 {
385         int rc;
386         ENTRY;
387
388         rc = mgs_check_index(obd, mti);
389         if (rc == 0) {
390                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
391                                    "this MGS does not know about it. Assuming"
392                                    " writeconf.\n", mti->mti_svname);
393                 mti->mti_flags |= LDD_F_WRITECONF;
394                 rc = 1;
395         } else if (rc == -1) {
396                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
397                                    "disappeared! Regenerating all logs.\n",
398                                    mti->mti_fsname);
399                 mti->mti_flags |= LDD_F_WRITECONF;
400                 rc = 1;
401         } else {
402                 /* Index is correctly marked as used */
403
404                 /* If the logs don't contain the mti_nids then add
405                    them as failover nids */
406                 rc = mgs_check_failnid(obd, mti);
407         }
408
409         RETURN(rc);
410 }
411
412 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
413 static int mgs_handle_target_reg(struct ptlrpc_request *req)
414 {
415         struct obd_device *obd = req->rq_export->exp_obd;
416         struct lustre_handle lockh;
417         struct mgs_target_info *mti, *rep_mti;
418         int rc = 0, lockrc;
419         ENTRY;
420
421         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
422
423         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
424         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
425                                 LDD_F_UPDATE))) {
426                 /* We're just here as a startup ping. */
427                 CDEBUG(D_MGS, "Server %s is running on %s\n",
428                        mti->mti_svname, obd_export_nid2str(req->rq_export));
429                 rc = mgs_check_target(obd, mti);
430                 /* above will set appropriate mti flags */
431                 if (rc <= 0)
432                         /* Nothing wrong, or fatal error */
433                         GOTO(out_nolock, rc);
434         }
435
436         /* Revoke the config lock to make sure nobody is reading. */
437         /* Although actually I think it should be alright if
438            someone was reading while we were updating the logs - if we
439            revoke at the end they will just update from where they left off. */
440         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
441         if (lockrc != ELDLM_OK) {
442                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
443                                    "update their configuration (%d). Updating "
444                                    "local logs anyhow; you might have to "
445                                    "manually restart other nodes to get the "
446                                    "latest configuration.\n",
447                                    obd->obd_name, lockrc);
448         }
449
450         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
451
452         /* Log writing contention is handled by the fsdb_sem */
453
454         if (mti->mti_flags & LDD_F_WRITECONF) {
455                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
456                     mti->mti_stripe_index == 0) {
457                         rc = mgs_erase_logs(obd, mti->mti_fsname);
458                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
459                                       "request.  All servers must be restarted "
460                                       "in order to regenerate the logs."
461                                       "\n", obd->obd_name, mti->mti_fsname);
462                 } else if (mti->mti_flags &
463                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
464                         rc = mgs_erase_log(obd, mti->mti_svname);
465                         LCONSOLE_WARN("%s: Regenerating %s log by user "
466                                       "request.\n",
467                                       obd->obd_name, mti->mti_svname);
468                 }
469                 mti->mti_flags |= LDD_F_UPDATE;
470                 /* Erased logs means start from scratch. */
471                 mti->mti_flags &= ~LDD_F_UPGRADE14;
472         }
473
474         /* COMPAT_146 */
475         if (mti->mti_flags & LDD_F_UPGRADE14) {
476                 rc = mgs_upgrade_sv_14(obd, mti);
477                 if (rc) {
478                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
479                         GOTO(out, rc);
480                 }
481
482                 /* We're good to go */
483                 mti->mti_flags |= LDD_F_UPDATE;
484         }
485         /* end COMPAT_146 */
486
487         if (mti->mti_flags & LDD_F_UPDATE) {
488                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
489                        mti->mti_stripe_index);
490
491                 /* create or update the target log
492                    and update the client/mdt logs */
493                 rc = mgs_write_log_target(obd, mti);
494                 if (rc) {
495                         CERROR("Failed to write %s log (%d)\n",
496                                mti->mti_svname, rc);
497                         GOTO(out, rc);
498                 }
499
500                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
501                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
502                                     LDD_F_UPGRADE14);
503                 mti->mti_flags |= LDD_F_REWRITE_LDD;
504         }
505
506 out:
507         /* done with log update */
508         if (lockrc == ELDLM_OK)
509                 mgs_put_cfg_lock(&lockh);
510 out_nolock:
511         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
512                mti->mti_stripe_index, rc);
513         rc = req_capsule_server_pack(&req->rq_pill);
514         if (rc)
515                 RETURN(rc);
516
517         /* send back the whole mti in the reply */
518         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
519         *rep_mti = *mti;
520
521         /* Flush logs to disk */
522         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
523         RETURN(rc);
524 }
525
526 static int mgs_set_info_rpc(struct ptlrpc_request *req)
527 {
528         struct obd_device *obd = req->rq_export->exp_obd;
529         struct mgs_send_param *msp, *rep_msp;
530         struct lustre_handle lockh;
531         int rc;
532         struct lustre_cfg_bufs bufs;
533         struct lustre_cfg *lcfg;
534         char fsname[MTI_NAME_MAXLEN];
535         ENTRY;
536
537         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
538         LASSERT(msp);
539
540         /* Construct lustre_cfg structure to pass to function mgs_setparam */
541         lustre_cfg_bufs_reset(&bufs, NULL);
542         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
543         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
544         rc = mgs_setparam(obd, lcfg, fsname);
545         if (rc) {
546                 CERROR("Error %d in setting the parameter %s for fs %s\n",
547                        rc, msp->mgs_param, fsname);
548                 RETURN(rc);
549         }
550
551         /* request for update */
552         mgs_revoke_lock(obd, fsname, &lockh);
553
554         lustre_cfg_free(lcfg);
555
556         rc = req_capsule_server_pack(&req->rq_pill);
557         if (rc == 0) {
558                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
559                 rep_msp = msp;
560         }
561         RETURN(rc);
562 }
563
564 /* Called whenever a target cleans up. */
565 /* XXX - Currently unused */
566 static int mgs_handle_target_del(struct ptlrpc_request *req)
567 {
568         ENTRY;
569         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
570         RETURN(0);
571 }
572
573 /* XXX - Currently unused */
574 static int mgs_handle_exception(struct ptlrpc_request *req)
575 {
576         ENTRY;
577         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
578         RETURN(0);
579 }
580
581 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
582 int mgs_handle(struct ptlrpc_request *req)
583 {
584         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
585         int opc, rc = 0;
586         ENTRY;
587
588         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
589         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
590         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
591                 RETURN(0);
592
593         LASSERT(current->journal_info == NULL);
594         opc = lustre_msg_get_opc(req->rq_reqmsg);
595         if (opc != MGS_CONNECT) {
596                 if (req->rq_export == NULL) {
597                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
598                                opc);
599                         req->rq_status = -ENOTCONN;
600                         GOTO(out, rc = -ENOTCONN);
601                 }
602         }
603
604         switch (opc) {
605         case MGS_CONNECT:
606                 DEBUG_REQ(D_MGS, req, "connect");
607                 /* MGS and MDS have same request format for connect */
608                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
609                 rc = target_handle_connect(req);
610                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
611                         /* Make clients trying to reconnect after a MGS restart
612                            happy; also requires obd_replayable */
613                         lustre_msg_add_op_flags(req->rq_repmsg,
614                                                 MSG_CONNECT_RECONNECT);
615                 break;
616         case MGS_DISCONNECT:
617                 DEBUG_REQ(D_MGS, req, "disconnect");
618                 /* MGS and MDS have same request format for disconnect */
619                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
620                 rc = target_handle_disconnect(req);
621                 req->rq_status = rc;            /* superfluous? */
622                 break;
623         case MGS_EXCEPTION:
624                 DEBUG_REQ(D_MGS, req, "exception");
625                 rc = mgs_handle_exception(req);
626                 break;
627         case MGS_TARGET_REG:
628                 DEBUG_REQ(D_MGS, req, "target add");
629                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
630                 rc = mgs_handle_target_reg(req);
631                 break;
632         case MGS_TARGET_DEL:
633                 DEBUG_REQ(D_MGS, req, "target del");
634                 rc = mgs_handle_target_del(req);
635                 break;
636         case MGS_SET_INFO:
637                 DEBUG_REQ(D_MGS, req, "set_info");
638                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
639                 rc = mgs_set_info_rpc(req);
640                 break;
641
642         case LDLM_ENQUEUE:
643                 DEBUG_REQ(D_MGS, req, "enqueue");
644                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
645                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
646                                          ldlm_server_blocking_ast, NULL);
647                 break;
648         case LDLM_BL_CALLBACK:
649         case LDLM_CP_CALLBACK:
650                 DEBUG_REQ(D_MGS, req, "callback");
651                 CERROR("callbacks should not happen on MGS\n");
652                 LBUG();
653                 break;
654
655         case OBD_PING:
656                 DEBUG_REQ(D_INFO, req, "ping");
657                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
658                 rc = target_handle_ping(req);
659                 break;
660         case OBD_LOG_CANCEL:
661                 DEBUG_REQ(D_MGS, req, "log cancel");
662                 rc = -ENOTSUPP; /* la la la */
663                 break;
664
665         case LLOG_ORIGIN_HANDLE_CREATE:
666                 DEBUG_REQ(D_MGS, req, "llog_init");
667                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
668                 rc = llog_origin_handle_create(req);
669                 break;
670         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
671                 DEBUG_REQ(D_MGS, req, "llog next block");
672                 req_capsule_set(&req->rq_pill,
673                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
674                 rc = llog_origin_handle_next_block(req);
675                 break;
676         case LLOG_ORIGIN_HANDLE_READ_HEADER:
677                 DEBUG_REQ(D_MGS, req, "llog read header");
678                 req_capsule_set(&req->rq_pill,
679                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
680                 rc = llog_origin_handle_read_header(req);
681                 break;
682         case LLOG_ORIGIN_HANDLE_CLOSE:
683                 DEBUG_REQ(D_MGS, req, "llog close");
684                 rc = llog_origin_handle_close(req);
685                 break;
686         case LLOG_CATINFO:
687                 DEBUG_REQ(D_MGS, req, "llog catinfo");
688                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
689                 rc = llog_catinfo(req);
690                 break;
691         default:
692                 req->rq_status = -ENOTSUPP;
693                 rc = ptlrpc_error(req);
694                 RETURN(rc);
695         }
696
697         LASSERT(current->journal_info == NULL);
698
699         if (rc)
700                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
701
702 out:
703         target_send_reply(req, rc, fail);
704         RETURN(0);
705 }
706
707 static inline int mgs_init_export(struct obd_export *exp)
708 {
709         return ldlm_init_export(exp);
710 }
711
712 static inline int mgs_destroy_export(struct obd_export *exp)
713 {
714         ENTRY;
715
716         target_destroy_export(exp);
717         mgs_client_free(exp);
718         ldlm_destroy_export(exp);
719
720         RETURN(0);
721 }
722
723 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
724 {
725         char *ptr;
726
727         ENTRY;
728         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
729                 *fsname = *ptr;
730                 fsname++;
731         }
732         if (*ptr == '\0')
733                 return -EINVAL;
734         *fsname = '\0';
735         ptr++;
736         strcpy(poolname, ptr);
737
738         RETURN(0);
739 }
740
741 static int mgs_iocontrol_pool(struct obd_device *obd,
742                               struct obd_ioctl_data *data)
743 {
744         int rc;
745         struct lustre_handle lockh;
746         struct lustre_cfg *lcfg = NULL;
747         struct llog_rec_hdr rec;
748         char *fsname = NULL;
749         char *poolname = NULL;
750         ENTRY;
751
752         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
753         if (fsname == NULL)
754                 RETURN(-ENOMEM);
755
756         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
757         if (poolname == NULL) {
758                 rc = -ENOMEM;
759                 GOTO(out_pool, rc);
760         }
761         rec.lrh_len = llog_data_len(data->ioc_plen1);
762
763         if (data->ioc_type == LUSTRE_CFG_TYPE) {
764                 rec.lrh_type = OBD_CFG_REC;
765         } else {
766                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
767                 rc = -EINVAL;
768                 GOTO(out_pool, rc);
769         }
770
771         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
772                 rc = -E2BIG;
773                 GOTO(out_pool, rc);
774         }
775
776         OBD_ALLOC(lcfg, data->ioc_plen1);
777         if (lcfg == NULL)
778                 GOTO(out_pool, rc = -ENOMEM);
779
780         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
781                 GOTO(out_pool, rc = -EFAULT);
782
783         if (lcfg->lcfg_bufcount < 2) {
784                 GOTO(out_pool, rc = -EFAULT);
785         }
786
787         /* first arg is always <fsname>.<poolname> */
788         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
789                             poolname);
790
791         switch (lcfg->lcfg_command) {
792         case LCFG_POOL_NEW: {
793                 if (lcfg->lcfg_bufcount != 2)
794                         RETURN(-EINVAL);
795                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
796                                   poolname, NULL);
797                 break;
798         }
799         case LCFG_POOL_ADD: {
800                 if (lcfg->lcfg_bufcount != 3)
801                         RETURN(-EINVAL);
802                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
803                                   lustre_cfg_string(lcfg, 2));
804                 break;
805         }
806         case LCFG_POOL_REM: {
807                 if (lcfg->lcfg_bufcount != 3)
808                         RETURN(-EINVAL);
809                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
810                                   lustre_cfg_string(lcfg, 2));
811                 break;
812         }
813         case LCFG_POOL_DEL: {
814                 if (lcfg->lcfg_bufcount != 2)
815                         RETURN(-EINVAL);
816                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
817                                   poolname, NULL);
818                 break;
819         }
820         default: {
821                  rc = -EINVAL;
822                  GOTO(out_pool, rc);
823         }
824         }
825
826         if (rc) {
827                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
828                        rc, lcfg->lcfg_command, fsname, poolname);
829                 GOTO(out_pool, rc);
830         }
831
832         /* request for update */
833         mgs_revoke_lock(obd, fsname, &lockh);
834
835 out_pool:
836         if (lcfg != NULL)
837                 OBD_FREE(lcfg, data->ioc_plen1);
838
839         if (fsname != NULL)
840                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
841
842         if (poolname != NULL)
843                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
844
845         RETURN(rc);
846 }
847
848 /* from mdt_iocontrol */
849 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
850                   void *karg, void *uarg)
851 {
852         struct obd_device *obd = exp->exp_obd;
853         struct obd_ioctl_data *data = karg;
854         struct lvfs_run_ctxt saved;
855         int rc = 0;
856
857         ENTRY;
858         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
859
860         switch (cmd) {
861
862         case OBD_IOC_PARAM: {
863                 struct lustre_handle lockh;
864                 struct lustre_cfg *lcfg;
865                 struct llog_rec_hdr rec;
866                 char fsname[MTI_NAME_MAXLEN];
867
868                 rec.lrh_len = llog_data_len(data->ioc_plen1);
869
870                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
871                         rec.lrh_type = OBD_CFG_REC;
872                 } else {
873                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
874                         RETURN(-EINVAL);
875                 }
876
877                 OBD_ALLOC(lcfg, data->ioc_plen1);
878                 if (lcfg == NULL)
879                         RETURN(-ENOMEM);
880                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
881                         GOTO(out_free, rc = -EFAULT);
882
883                 if (lcfg->lcfg_bufcount < 1)
884                         GOTO(out_free, rc = -EINVAL);
885
886                 rc = mgs_setparam(obd, lcfg, fsname);
887                 if (rc) {
888                         CERROR("setparam err %d\n", rc);
889                         GOTO(out_free, rc);
890                 }
891
892                 /* Revoke lock so everyone updates.  Should be alright if
893                    someone was already reading while we were updating the logs,
894                    so we don't really need to hold the lock while we're
895                    writing (above). */
896                 mgs_revoke_lock(obd, fsname, &lockh);
897
898 out_free:
899                 OBD_FREE(lcfg, data->ioc_plen1);
900                 RETURN(rc);
901         }
902
903         case OBD_IOC_POOL: {
904                 RETURN(mgs_iocontrol_pool(obd, data));
905         }
906
907         case OBD_IOC_DUMP_LOG: {
908                 struct llog_ctxt *ctxt;
909                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
910                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
911                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
912                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
913                 llog_ctxt_put(ctxt);
914
915                 RETURN(rc);
916         }
917
918         case OBD_IOC_LLOG_CHECK:
919         case OBD_IOC_LLOG_INFO:
920         case OBD_IOC_LLOG_PRINT: {
921                 struct llog_ctxt *ctxt;
922                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
923
924                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
925                 rc = llog_ioctl(ctxt, cmd, data);
926                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
927                 llog_ctxt_put(ctxt);
928
929                 RETURN(rc);
930         }
931
932         default:
933                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
934                 RETURN(-EINVAL);
935         }
936         RETURN(0);
937 }
938
939 /* use obd ops to offer management infrastructure */
940 static struct obd_ops mgs_obd_ops = {
941         .o_owner           = THIS_MODULE,
942         .o_connect         = mgs_connect,
943         .o_reconnect       = mgs_reconnect,
944         .o_disconnect      = mgs_disconnect,
945         .o_setup           = mgs_setup,
946         .o_precleanup      = mgs_precleanup,
947         .o_cleanup         = mgs_cleanup,
948         .o_init_export     = mgs_init_export,
949         .o_destroy_export  = mgs_destroy_export,
950         .o_iocontrol       = mgs_iocontrol,
951         .o_llog_init       = mgs_llog_init,
952         .o_llog_finish     = mgs_llog_finish
953 };
954
955 static int __init mgs_init(void)
956 {
957         struct lprocfs_static_vars lvars;
958
959         lprocfs_mgs_init_vars(&lvars);
960         class_register_type(&mgs_obd_ops, NULL,
961                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
962
963         return 0;
964 }
965
966 static void /*__exit*/ mgs_exit(void)
967 {
968         class_unregister_type(LUSTRE_MGS_NAME);
969 }
970
971 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
972 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
973 MODULE_LICENSE("GPL");
974
975 module_init(mgs_init);
976 module_exit(mgs_exit);