Whamcloud - gitweb
- start seq mgr for data stack in separate portal;
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52  * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
54         (0x400),
55         ((__u64)~0ULL)
56 };
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
58
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
61         0,
62         0
63 };
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
65
66 /* assigns client to sequence controller node */
67 int seq_server_set_cli(struct lu_server_seq *seq,
68                        struct lu_client_seq *cli,
69                        const struct lu_context *ctx)
70 {
71         int rc = 0;
72         ENTRY;
73
74         if (cli == NULL) {
75                 CDEBUG(D_INFO|D_WARNING, "%s: detached "
76                        "sequence mgr client %s\n", seq->lss_name,
77                        cli->lcs_exp->exp_client_uuid.uuid);
78                 seq->lss_cli = cli;
79                 RETURN(0);
80         }
81
82         if (seq->lss_cli) {
83                 CERROR("%s: sequence-controller is already "
84                        "assigned\n", seq->lss_name);
85                 RETURN(-EINVAL);
86         }
87
88         CDEBUG(D_INFO|D_WARNING, "%s: attached "
89                "sequence client %s\n", seq->lss_name,
90                cli->lcs_exp->exp_client_uuid.uuid);
91
92         /* asking client for new range, assign that range to ->seq_super and
93          * write seq state to backing store should be atomic. */
94         down(&seq->lss_sem);
95
96         /* assign controller */
97         seq->lss_cli = cli;
98
99         /* get new range from controller only if super-sequence is not yet
100          * initialized from backing store or something else. */
101         if (range_is_zero(&seq->lss_super)) {
102                 rc = seq_client_alloc_super(cli);
103                 if (rc) {
104                         CERROR("can't allocate super-sequence, "
105                                "rc %d\n", rc);
106                         GOTO(out_up, rc);
107                 }
108
109                 /* take super-seq from client seq mgr */
110                 LASSERT(range_is_sane(&cli->lcs_range));
111
112                 seq->lss_super = cli->lcs_range;
113
114                 /* save init seq to backing store. */
115                 rc = seq_store_write(seq, ctx);
116                 if (rc) {
117                         CERROR("can't write sequence state, "
118                                "rc = %d\n", rc);
119                 }
120         }
121
122         EXIT;
123 out_up:
124         up(&seq->lss_sem);
125         return rc;
126 }
127 EXPORT_SYMBOL(seq_server_set_cli);
128
129 /* on controller node, allocate new super sequence for regular sequence
130  * server. */
131 static int __seq_server_alloc_super(struct lu_server_seq *seq,
132                                     struct lu_range *range,
133                                     const struct lu_context *ctx)
134 {
135         struct lu_range *space = &seq->lss_space;
136         int rc;
137         ENTRY;
138
139         LASSERT(range_is_sane(space));
140
141         if (range_space(space) < seq->lss_super_width) {
142                 CWARN("sequences space is going to exhaust soon. "
143                       "Can allocate only "LPU64" sequences\n",
144                       range_space(space));
145                 *range = *space;
146                 space->lr_start = space->lr_end;
147                 rc = 0;
148         } else if (range_is_exhausted(space)) {
149                 CERROR("sequences space is exhausted\n");
150                 rc = -ENOSPC;
151         } else {
152                 range_alloc(range, space, seq->lss_super_width);
153                 rc = 0;
154         }
155
156         if (rc == 0) {
157                 rc = seq_store_write(seq, ctx);
158                 if (rc) {
159                         CERROR("can't save state, rc = %d\n",
160                                rc);
161                 }
162         }
163
164         if (rc == 0) {
165                 CDEBUG(D_INFO, "%s: allocated super-sequence "
166                        DRANGE"\n", seq->lss_name, PRANGE(range));
167         }
168
169         RETURN(rc);
170 }
171
172 static int seq_server_alloc_super(struct lu_server_seq *seq,
173                                   struct lu_range *range,
174                                   const struct lu_context *ctx)
175 {
176         int rc;
177         ENTRY;
178
179         down(&seq->lss_sem);
180         rc = __seq_server_alloc_super(seq, range, ctx);
181         up(&seq->lss_sem);
182
183         RETURN(rc);
184 }
185
186 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
187                                    struct lu_range *range,
188                                    const struct lu_context *ctx)
189 {
190         struct lu_range *super = &seq->lss_super;
191         int rc = 0;
192         ENTRY;
193
194         LASSERT(range_is_sane(super));
195
196         /* XXX: here we should avoid cascading RPCs using kind of async
197          * preallocation when meta-sequence is close to exhausting. */
198         if (range_is_exhausted(super)) {
199                 if (!seq->lss_cli) {
200                         CERROR("no seq-controller client is setup\n");
201                         RETURN(-EOPNOTSUPP);
202                 }
203
204                 rc = seq_client_alloc_super(seq->lss_cli);
205                 if (rc) {
206                         CERROR("can't allocate new super-sequence, "
207                                "rc %d\n", rc);
208                         RETURN(rc);
209                 }
210
211                 /* saving new range into allocation space. */
212                 *super = seq->lss_cli->lcs_range;
213                 LASSERT(range_is_sane(super));
214         }
215         range_alloc(range, super, seq->lss_meta_width);
216
217         rc = seq_store_write(seq, ctx);
218         if (rc) {
219                 CERROR("can't save state, rc = %d\n",
220                        rc);
221         }
222
223         if (rc == 0) {
224                 CDEBUG(D_INFO, "%s: allocated meta-sequence "
225                        DRANGE"\n", seq->lss_name, PRANGE(range));
226         }
227
228         RETURN(rc);
229 }
230
231 static int seq_server_alloc_meta(struct lu_server_seq *seq,
232                                  struct lu_range *range,
233                                  const struct lu_context *ctx)
234 {
235         int rc;
236         ENTRY;
237
238         down(&seq->lss_sem);
239         rc = __seq_server_alloc_meta(seq, range, ctx);
240         up(&seq->lss_sem);
241
242         RETURN(rc);
243 }
244
245 static int seq_req_handle0(const struct lu_context *ctx,
246                            struct ptlrpc_request *req,
247                            struct seq_thread_info *info)
248 {
249         struct lu_site *site;
250         struct lu_range *out;
251         int rc = -EPROTO;
252         __u32 *opc;
253         ENTRY;
254
255         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
256         LASSERT(site != NULL);
257                         
258         req_capsule_pack(&info->sti_pill);
259
260         opc = req_capsule_client_get(&info->sti_pill,
261                                      &RMF_SEQ_OPC);
262         if (opc != NULL) {
263                 out = req_capsule_server_get(&info->sti_pill,
264                                              &RMF_SEQ_RANGE);
265                 if (out == NULL)
266                         RETURN(-EPROTO);
267
268                 switch (*opc) {
269                 case SEQ_ALLOC_META:
270                         if (!site->ls_server_seq) {
271                                 CERROR("sequence-server is not "
272                                        "initialized\n");
273                                 RETURN(-EINVAL);
274                         }
275                         rc = seq_server_alloc_meta(site->ls_server_seq,
276                                                    out, ctx);
277                         break;
278                 case SEQ_ALLOC_SUPER:
279                         if (!site->ls_control_seq) {
280                                 CERROR("sequence-controller is not "
281                                        "initialized\n");
282                                 RETURN(-EINVAL);
283                         }
284                         rc = seq_server_alloc_super(site->ls_control_seq,
285                                                     out, ctx);
286                         break;
287                 default:
288                         CERROR("wrong opc %#x\n", *opc);
289                         break;
290                 }
291         }
292
293         RETURN(rc);
294 }
295
296 static void *seq_thread_init(const struct lu_context *ctx,
297                              struct lu_context_key *key)
298 {
299         struct seq_thread_info *info;
300
301         /*
302          * check that no high order allocations are incurred.
303          */
304         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
305         OBD_ALLOC_PTR(info);
306         if (info == NULL)
307                 info = ERR_PTR(-ENOMEM);
308         return info;
309 }
310
311 static void seq_thread_fini(const struct lu_context *ctx,
312                             struct lu_context_key *key, void *data)
313 {
314         struct seq_thread_info *info = data;
315         OBD_FREE_PTR(info);
316 }
317
318 struct lu_context_key seq_thread_key = {
319         .lct_tags = LCT_MD_THREAD,
320         .lct_init = seq_thread_init,
321         .lct_fini = seq_thread_fini
322 };
323
324 static void seq_thread_info_init(struct ptlrpc_request *req,
325                                  struct seq_thread_info *info)
326 {
327         int i;
328
329         /* mark rep buffer as req-layout stuff expects */
330         for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
331                 info->sti_rep_buf_size[i] = -1;
332
333         /* init request capsule */
334         req_capsule_init(&info->sti_pill, req, RCL_SERVER,
335                          info->sti_rep_buf_size);
336
337         req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
338 }
339
340 static void seq_thread_info_fini(struct seq_thread_info *info)
341 {
342         req_capsule_fini(&info->sti_pill);
343 }
344
345 static int seq_req_handle(struct ptlrpc_request *req)
346 {
347         const struct lu_context *ctx;
348         struct seq_thread_info *info;
349         int rc = 0;
350         ENTRY;
351
352         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
353
354         ctx = req->rq_svc_thread->t_ctx;
355         LASSERT(ctx != NULL);
356         LASSERT(ctx->lc_thread == req->rq_svc_thread);
357
358         info = lu_context_key_get(ctx, &seq_thread_key);
359         LASSERT(info != NULL);
360
361         seq_thread_info_init(req, info);
362
363         if (req->rq_reqmsg->opc == SEQ_QUERY) {
364                 if (req->rq_export != NULL) {
365                         /* 
366                          * no need to return error here and overwrite @rc, this
367                          * function should return 0 even if seq_req_handle0()
368                          * returns some error code.
369                          */
370                         seq_req_handle0(ctx, req, info);
371                 } else {
372                         CERROR("Unconnected request\n");
373                         req->rq_status = -ENOTCONN;
374                 }
375         } else {
376                 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
377                 req->rq_status = -ENOTSUPP;
378                 rc = ptlrpc_error(req);
379                 GOTO(out_info, rc);
380         }
381
382         target_send_reply(req, rc, OBD_FAIL_SEQ_ALL_REPLY_NET);
383         EXIT;
384 out_info:
385         seq_thread_info_fini(info);
386         return rc;
387 }
388
389 static void seq_server_proc_fini(struct lu_server_seq *seq);
390
391 #ifdef LPROCFS
392 static int seq_server_proc_init(struct lu_server_seq *seq)
393 {
394         int rc;
395         ENTRY;
396
397         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
398                                              proc_lustre_root,
399                                              NULL, NULL);
400         if (IS_ERR(seq->lss_proc_dir)) {
401                 CERROR("LProcFS failed in seq-init\n");
402                 rc = PTR_ERR(seq->lss_proc_dir);
403                 RETURN(rc);
404         }
405
406         seq->lss_proc_entry = lprocfs_register("services",
407                                                seq->lss_proc_dir,
408                                                NULL, NULL);
409         if (IS_ERR(seq->lss_proc_entry)) {
410                 CERROR("LProcFS failed in seq-init\n");
411                 rc = PTR_ERR(seq->lss_proc_entry);
412                 GOTO(out_cleanup, rc);
413         }
414
415         rc = lprocfs_add_vars(seq->lss_proc_dir,
416                               seq_server_proc_list, seq);
417         if (rc) {
418                 CERROR("can't init sequence manager "
419                        "proc, rc %d\n", rc);
420                 GOTO(out_cleanup, rc);
421         }
422
423         RETURN(0);
424
425 out_cleanup:
426         seq_server_proc_fini(seq);
427         return rc;
428 }
429
430 static void seq_server_proc_fini(struct lu_server_seq *seq)
431 {
432         ENTRY;
433         if (seq->lss_proc_entry != NULL) {
434                 if (!IS_ERR(seq->lss_proc_entry))
435                         lprocfs_remove(seq->lss_proc_entry);
436                 seq->lss_proc_entry = NULL;
437         }
438
439         if (seq->lss_proc_dir != NULL) {
440                 if (!IS_ERR(seq->lss_proc_dir))
441                         lprocfs_remove(seq->lss_proc_dir);
442                 seq->lss_proc_dir = NULL;
443         }
444         EXIT;
445 }
446 #else
447 static int seq_server_proc_init(struct lu_server_seq *seq)
448 {
449         return 0;
450 }
451
452 static void seq_server_proc_fini(struct lu_server_seq *seq)
453 {
454         return;
455 }
456 #endif
457
458 #define LUSTRE_MD_SEQ_NAME "md-seq"
459 #define LUSTRE_CT_SEQ_NAME "ct-seq"
460 #define LUSTRE_DT_SEQ_NAME "dt-seq"
461
462 int seq_server_init(struct lu_server_seq *seq,
463                     struct dt_device *dev,
464                     const char *uuid,
465                     enum lu_mgr_type type,
466                     const struct lu_context *ctx)
467 {
468         int is_srv = type == LUSTRE_SEQ_SERVER;
469         
470         int rc, req_portal = is_srv ?
471                 SEQ_SERVER_PORTAL : SEQ_CONTROLLER_PORTAL;
472
473         struct ptlrpc_service_conf seq_md_conf = {
474                 .psc_nbufs = MDS_NBUFS,
475                 .psc_bufsize = MDS_BUFSIZE,
476                 .psc_max_req_size = SEQ_MAXREQSIZE,
477                 .psc_max_reply_size = SEQ_MAXREPSIZE,
478                 .psc_req_portal = req_portal,
479                 .psc_rep_portal = MDC_REPLY_PORTAL,
480                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
481                 .psc_num_threads = SEQ_NUM_THREADS,
482                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
483         };
484         struct ptlrpc_service_conf seq_dt_conf = {
485                 .psc_nbufs = MDS_NBUFS,
486                 .psc_bufsize = MDS_BUFSIZE,
487                 .psc_max_req_size = SEQ_MAXREQSIZE,
488                 .psc_max_reply_size = SEQ_MAXREPSIZE,
489                 .psc_req_portal = SEQ_SERVER_PORTAL,
490                 .psc_rep_portal = OSC_REPLY_PORTAL,
491                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
492                 .psc_num_threads = SEQ_NUM_THREADS,
493                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
494         };
495         ENTRY;
496
497         LASSERT(dev != NULL);
498         LASSERT(uuid != NULL);
499
500         seq->lss_dev = dev;
501         seq->lss_cli = NULL;
502         seq->lss_type = type;
503         sema_init(&seq->lss_sem, 1);
504
505         seq->lss_super_width = LUSTRE_SEQ_SUPER_WIDTH;
506         seq->lss_meta_width = LUSTRE_SEQ_META_WIDTH;
507
508         snprintf(seq->lss_name, sizeof(seq->lss_name), "%s-%s-%s",
509                  LUSTRE_SEQ_NAME, (is_srv ? "srv" : "ctl"),
510                  uuid);
511
512         seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
513         seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
514
515         lu_device_get(&seq->lss_dev->dd_lu_dev);
516
517         rc = seq_store_init(seq, ctx);
518         if (rc)
519                 GOTO(out, rc);
520
521         /* request backing store for saved sequence info */
522         rc = seq_store_read(seq, ctx);
523         if (rc == -ENODATA) {
524                 CDEBUG(D_INFO|D_WARNING, "%s: no data on "
525                        "storage was found, %s\n", seq->lss_name,
526                        is_srv ? "wait for controller attach" :
527                        "this is first controller run");
528         } else if (rc) {
529                 CERROR("can't read sequence state, rc = %d\n",
530                        rc);
531                 GOTO(out, rc);
532         }
533
534         rc  = seq_server_proc_init(seq);
535         if (rc)
536                 GOTO(out, rc);
537
538         seq->lss_md_service = ptlrpc_init_svc_conf(&seq_md_conf,
539                                                    seq_req_handle,
540                                                    LUSTRE_SEQ_NAME,
541                                                    seq->lss_proc_entry,
542                                                    NULL);
543         if (seq->lss_md_service != NULL)
544                 rc = ptlrpc_start_threads(NULL, seq->lss_md_service,
545                                           is_srv ? LUSTRE_MD_SEQ_NAME :
546                                                    LUSTRE_CT_SEQ_NAME);
547         else
548                 GOTO(out, rc = -ENOMEM);
549
550         /* 
551          * we want to have really cluster-wide sequences space. This is why we
552          * start only one sequence controller which manages space.
553          */
554         if (is_srv) {
555                 seq->lss_dt_service =  ptlrpc_init_svc_conf(&seq_dt_conf,
556                                                             seq_req_handle,
557                                                             LUSTRE_SEQ_NAME,
558                                                             seq->lss_proc_entry,
559                                                             NULL);
560                 if (seq->lss_dt_service != NULL)
561                         rc = ptlrpc_start_threads(NULL, seq->lss_dt_service,
562                                                   LUSTRE_DT_SEQ_NAME);
563                 else
564                         GOTO(out, rc = -ENOMEM);
565         }
566         
567         EXIT;
568 out:
569         if (rc) {
570                 seq_server_fini(seq, ctx);
571         } else {
572                 CDEBUG(D_INFO|D_WARNING, "%s Sequence Manager\n",
573                        (is_srv ? "Server" : "Controller"));
574         }
575         return rc;
576 }
577 EXPORT_SYMBOL(seq_server_init);
578
579 void seq_server_fini(struct lu_server_seq *seq,
580                      const struct lu_context *ctx)
581 {
582         ENTRY;
583
584         if (seq->lss_md_service != NULL) {
585                 ptlrpc_unregister_service(seq->lss_md_service);
586                 seq->lss_md_service = NULL;
587         }
588
589         if (seq->lss_dt_service != NULL) {
590                 ptlrpc_unregister_service(seq->lss_dt_service);
591                 seq->lss_dt_service = NULL;
592         }
593
594         seq_server_proc_fini(seq);
595         seq_store_fini(seq, ctx);
596
597         if (seq->lss_dev != NULL) {
598                 lu_device_put(&seq->lss_dev->dd_lu_dev);
599                 seq->lss_dev = NULL;
600         }
601
602         EXIT;
603 }
604 EXPORT_SYMBOL(seq_server_fini);
605
606 static int fid_init(void)
607 {
608         ENTRY;
609         RETURN(0);
610 }
611
612 static int fid_fini(void)
613 {
614         ENTRY;
615         RETURN(0);
616 }
617
618 static int __init fid_mod_init(void)
619 {
620         /* init caches if any */
621         fid_init();
622         return 0;
623 }
624
625 static void __exit fid_mod_exit(void)
626 {
627         /* free caches if any */
628         fid_fini();
629         return;
630 }
631
632 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
633 MODULE_DESCRIPTION("Lustre FID Module");
634 MODULE_LICENSE("GPL");
635
636 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
637 #endif