Whamcloud - gitweb
- changes in sec-mgr
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 /* client seq mgr interface */
51 static int 
52 seq_client_rpc(struct lu_client_seq *seq, 
53                struct lu_range *range,
54                __u32 opc)
55 {
56         int repsize = sizeof(struct lu_range);
57         int rc, reqsize = sizeof(__u32);
58         struct ptlrpc_request *req;
59         struct lu_range *ran;
60         __u32 *op;
61         ENTRY;
62
63         req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
64                               LUSTRE_MDS_VERSION, SEQ_QUERY,
65                               1, &reqsize, NULL);
66         if (req == NULL)
67                 RETURN(-ENOMEM);
68
69         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
70         *op = opc;
71
72         req->rq_replen = lustre_msg_size(1, &repsize);
73         req->rq_request_portal = MDS_SEQ_PORTAL;
74         rc = ptlrpc_queue_wait(req);
75         if (rc)
76                 GOTO(out_req, rc);
77
78         ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
79                                  lustre_swab_lu_range);
80
81         if (ran == NULL) {
82                 CERROR("invalid range is returned\n");
83                 GOTO(out_req, rc = -EPROTO);
84         }
85         *range = *ran;
86         EXIT;
87 out_req:
88         ptlrpc_req_finished(req); 
89         return rc;
90 }
91
92 /* request sequence-controller node to allocate new super-sequence. */
93 int
94 seq_client_alloc_super(struct lu_client_seq *seq)
95 {
96         int rc;
97         ENTRY;
98
99         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
100         rc = seq_client_rpc(seq, &seq->seq_cl_range,
101                             SEQ_ALLOC_SUPER);
102         if (rc == 0) {
103                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated super-sequence "
104                        "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
105                        seq->seq_cl_range.lr_end);
106         }
107         RETURN(rc);
108 }
109 EXPORT_SYMBOL(seq_client_alloc_super);
110
111 /* request sequence-controller node to allocate new meta-sequence. */
112 int
113 seq_client_alloc_meta(struct lu_client_seq *seq)
114 {
115         int rc;
116         ENTRY;
117
118         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
119         rc = seq_client_rpc(seq, &seq->seq_cl_range,
120                             SEQ_ALLOC_META);
121         if (rc == 0) {
122                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated meta-sequence "
123                        "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
124                        seq->seq_cl_range.lr_end);
125         }
126         RETURN(rc);
127 }
128 EXPORT_SYMBOL(seq_client_alloc_meta);
129
130 /* allocate new sequence for client (llite or MDC are expected to use this) */
131 int
132 seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
133 {
134         int rc;
135         ENTRY;
136
137         down(&seq->seq_sem);
138
139         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
140         LASSERT(range_is_sane(&seq->seq_cl_range));
141
142         /* if we still have free sequences in meta-sequence we allocate new seq
143          * from given range. */
144         if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
145                 *seqnr = seq->seq_cl_range.lr_start;
146                 seq->seq_cl_range.lr_start += 1;
147                 rc = 0;
148         } else {
149                 /* meta-sequence is exhausted, request MDT to allocate new
150                  * meta-sequence for us. */
151                 rc = seq_client_alloc_meta(seq);
152                 if (rc) {
153                         CERROR("can't allocate new meta-sequence, "
154                                "rc %d\n", rc);
155                 }
156                 
157                 *seqnr = seq->seq_cl_range.lr_start;
158                 seq->seq_cl_range.lr_start += 1;
159         }
160         up(&seq->seq_sem);
161
162         if (rc == 0) {
163                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated sequence "
164                        "["LPX64"]\n", *seqnr);
165         }
166         RETURN(rc);
167 }
168 EXPORT_SYMBOL(seq_client_alloc_seq);
169
170 int
171 seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
172 {
173         int rc;
174         ENTRY;
175
176         LASSERT(fid != NULL);
177         LASSERT(fid_is_sane(&seq->seq_fid));
178         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
179
180         down(&seq->seq_sem);
181         if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
182                 *fid = seq->seq_fid;
183                 seq->seq_fid.f_oid += 1;
184                 rc = 0;
185         } else {
186                 __u64 seqnr = 0;
187                 
188                 rc = seq_client_alloc_seq(seq, &seqnr);
189                 if (rc) {
190                         CERROR("can't allocate new sequence, "
191                                "rc %d\n", rc);
192                         GOTO(out, rc);
193                 } else {
194                         seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
195                         seq->seq_fid.f_seq = seqnr;
196                         seq->seq_fid.f_ver = 0;
197                         
198                         *fid = seq->seq_fid;
199                         seq->seq_fid.f_oid += 1;
200                 }
201         }
202         LASSERT(fid_is_sane(fid));
203         
204         CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
205                PFID3(fid));
206
207         EXIT;
208 out:
209         up(&seq->seq_sem);
210         return rc;
211 }
212 EXPORT_SYMBOL(seq_client_alloc_fid);
213
214 int 
215 seq_client_init(struct lu_client_seq *seq, 
216                 struct obd_export *exp,
217                 int flags)
218 {
219         int rc;
220         ENTRY;
221
222         LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
223                          LUSTRE_CLI_SEQ_SERVER));
224
225         seq->seq_flags = flags;
226         fid_zero(&seq->seq_fid);
227         sema_init(&seq->seq_sem, 1);
228         
229         seq->seq_cl_range.lr_end = 0;
230         seq->seq_cl_range.lr_start = 0;
231         
232         if (exp != NULL)
233                 seq->seq_exp = class_export_get(exp);
234
235         if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
236                 __u64 seqnr = 0;
237                 
238                 /* client (llite or MDC) init case, we need new sequence from
239                  * MDT. This will allocate new meta-sequemce first, because seq
240                  * range in init state and looks the same as exhausted. */
241                 rc = seq_client_alloc_seq(seq, &seqnr);
242                 if (rc) {
243                         CERROR("can't allocate new sequence, rc %d\n", rc);
244                         GOTO(out, rc);
245                 } else {
246                         seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
247                         seq->seq_fid.f_seq = seqnr;
248                         seq->seq_fid.f_ver = 0;
249                 }
250
251                 LASSERT(fid_is_sane(&seq->seq_fid));
252         } else {
253                 /* check if this is controller node is trying to init client. */
254                 if (seq->seq_exp) {
255                         /* MDT uses client seq manager to talk to sequence
256                          * controller, and thus, we need super-sequence. */
257                         rc = seq_client_alloc_super(seq);
258                 } else {
259                         rc = 0;
260                 }
261         }
262
263         EXIT;
264 out:
265         if (rc)
266                 seq_client_fini(seq);
267         else
268                 CDEBUG(D_INFO, "Client Sequence Manager initialized\n");
269         return rc;
270 }
271 EXPORT_SYMBOL(seq_client_init);
272
273 void seq_client_fini(struct lu_client_seq *seq)
274 {
275         ENTRY;
276         if (seq->seq_exp != NULL) {
277                 class_export_put(seq->seq_exp);
278                 seq->seq_exp = NULL;
279         }
280         CDEBUG(D_INFO, "Client Sequence Manager finalized\n");
281         EXIT;
282 }
283 EXPORT_SYMBOL(seq_client_fini);
284
285 #ifdef __KERNEL__
286 /* server side seq mgr stuff */
287 static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
288         LUSTRE_SEQ_SPACE_START,
289         LUSTRE_SEQ_SPACE_LIMIT
290 };
291
292 static const struct lu_range LUSTRE_SEQ_META_INIT = {
293         0,
294         0
295 };
296
297 static int
298 seq_server_write_state(struct lu_server_seq *seq,
299                        const struct lu_context *ctx)
300 {
301         int rc = 0;
302         ENTRY;
303
304         /* XXX: here should be calling struct dt_device methods to write
305          * sequence state to backing store. */
306
307         RETURN(rc);
308 }
309
310 static int
311 seq_server_read_state(struct lu_server_seq *seq,
312                       const struct lu_context *ctx)
313 {
314         int rc = -ENODATA;
315         ENTRY;
316         
317         /* XXX: here should be calling struct dt_device methods to read the
318          * sequence state from backing store. */
319
320         RETURN(rc);
321 }
322
323 static int
324 seq_server_alloc_super(struct lu_server_seq *seq,
325                        struct lu_range *range)
326 {
327         struct lu_range *ss_range = &seq->seq_ss_range;
328         int rc;
329         ENTRY;
330
331         if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
332                 CWARN("super-sequence is going to exhauste soon. "
333                       "Only can allocate "LPU64" sequences\n",
334                       ss_range->lr_end - ss_range->lr_start);
335                 *range = *ss_range;
336                 ss_range->lr_start = ss_range->lr_end;
337                 rc = 0;
338         } else if (ss_range->lr_start >= ss_range->lr_end) {
339                 CERROR("super-sequence is exhausted\n");
340                 rc = -ENOSPC;
341         } else {
342                 range->lr_start = ss_range->lr_start;
343                 ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
344                 range->lr_end = ss_range->lr_start;
345                 rc = 0;
346         }
347
348         if (rc == 0) {
349                 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated super-sequence "
350                        "["LPX64"-"LPX64"]\n", range->lr_start,
351                        range->lr_end);
352         }
353         
354         RETURN(rc);
355 }
356
357 static int
358 seq_server_alloc_meta(struct lu_server_seq *seq,
359                       struct lu_range *range)
360 {
361         struct lu_range *ms_range = &seq->seq_ms_range;
362         int rc;
363         ENTRY;
364
365         LASSERT(range_is_sane(ms_range));
366         
367         /* XXX: here should avoid cascading RPCs using kind of async
368          * preallocation when meta-sequence is close to exhausting. */
369         if (ms_range->lr_start == ms_range->lr_end) {
370                 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
371                         /* allocate new range of meta-sequences to allocate new
372                          * meta-sequence from it. */
373                         rc = seq_server_alloc_super(seq, ms_range);
374                 } else {
375                         /* request controller to allocate new super-sequence for
376                          * us.*/
377                         rc = seq_client_alloc_super(seq->seq_cli);
378                         if (rc) {
379                                 CERROR("can't allocate new super-sequence, "
380                                        "rc %d\n", rc);
381                                 RETURN(rc);
382                         }
383
384                         /* saving new range into allocation space. */
385                         *ms_range = seq->seq_cli->seq_cl_range;
386                 }
387
388                 LASSERT(ms_range->lr_start != 0);
389                 LASSERT(ms_range->lr_end > ms_range->lr_start);
390         } else {
391                 rc = 0;
392         }
393         range->lr_start = ms_range->lr_start;
394         ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
395         range->lr_end = ms_range->lr_start;
396
397         if (rc == 0) {
398                 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated meta-sequence "
399                        "["LPX64"-"LPX64"]\n", range->lr_start,
400                        range->lr_end);
401         }
402
403         RETURN(rc);
404 }
405
406 static int
407 seq_server_handle(struct lu_server_seq *seq,
408                   const struct lu_context *ctx, 
409                   struct lu_range *range,
410                   __u32 opc)
411 {
412         int rc;
413         ENTRY;
414
415         down(&seq->seq_sem);
416
417         switch (opc) {
418         case SEQ_ALLOC_SUPER:
419                 rc = seq_server_alloc_super(seq, range);
420                 break;
421         case SEQ_ALLOC_META:
422                 rc = seq_server_alloc_meta(seq, range);
423                 break;
424         default:
425                 rc = -EINVAL;
426                 break;
427         }
428
429         if (rc)
430                 GOTO(out, rc);
431
432         rc = seq_server_write_state(seq, ctx);
433         if (rc) {
434                 CERROR("can't save state, rc = %d\n",
435                        rc);
436         }
437
438         EXIT;
439 out:
440         up(&seq->seq_sem);
441         return rc;
442 }
443
444 static int
445 seq_req_handle0(const struct lu_context *ctx,
446                 struct lu_server_seq *seq, 
447                 struct ptlrpc_request *req) 
448 {
449         int rep_buf_size[2] = { 0, };
450         struct req_capsule pill;
451         struct lu_range *out;
452         int rc = -EPROTO;
453         __u32 *opc;
454         ENTRY;
455
456         req_capsule_init(&pill, req, RCL_SERVER,
457                          rep_buf_size);
458
459         req_capsule_set(&pill, &RQF_SEQ_QUERY);
460         req_capsule_pack(&pill);
461
462         opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
463         if (opc != NULL) {
464                 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
465                 if (out == NULL) {
466                         CERROR("can't get range buffer\n");
467                         GOTO(out_pill, rc= -EPROTO);
468                 }
469                 rc = seq_server_handle(seq, ctx, out, *opc);
470         } else {
471                 CERROR("cannot unpack client request\n");
472         }
473
474 out_pill:
475         EXIT;
476         req_capsule_fini(&pill);
477         return rc;
478 }
479
480 static int 
481 seq_req_handle(struct ptlrpc_request *req) 
482 {
483         int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
484         const struct lu_context *ctx;
485         struct lu_site    *site;
486         int rc = -EPROTO;
487         ENTRY;
488
489         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
490         
491         ctx = req->rq_svc_thread->t_ctx;
492         LASSERT(ctx != NULL);
493         LASSERT(ctx->lc_thread == req->rq_svc_thread);
494         if (req->rq_reqmsg->opc == SEQ_QUERY) {
495                 if (req->rq_export != NULL) {
496                         struct obd_device *obd;
497
498                         obd = req->rq_export->exp_obd;
499                         site = obd->obd_lu_dev->ld_site;
500                         LASSERT(site != NULL);
501                         
502                         rc = seq_req_handle0(ctx, site->ls_server_seq, req);
503                 } else {
504                         CERROR("Unconnected request\n");
505                         req->rq_status = -ENOTCONN;
506                         GOTO(out, rc = -ENOTCONN);
507                 }
508         } else {
509                 CERROR("Wrong opcode: %d\n",
510                        req->rq_reqmsg->opc);
511                 req->rq_status = -ENOTSUPP;
512                 rc = ptlrpc_error(req);
513                 RETURN(rc);
514         }
515
516         EXIT;
517 out:
518         target_send_reply(req, rc, fail);
519         return 0;
520
521
522 int
523 seq_server_init(struct lu_server_seq *seq,
524                 struct lu_client_seq *cli,
525                 const struct lu_context  *ctx,
526                 struct dt_device *dev,
527                 int flags) 
528 {
529         int rc; 
530         ENTRY;
531
532         struct ptlrpc_service_conf seq_conf = { 
533                 .psc_nbufs = MDS_NBUFS, 
534                 .psc_bufsize = MDS_BUFSIZE, 
535                 .psc_max_req_size = MDS_MAXREQSIZE,
536                 .psc_max_reply_size = MDS_MAXREPSIZE,
537                 .psc_req_portal = MDS_SEQ_PORTAL,
538                 .psc_rep_portal = MDC_REPLY_PORTAL,
539                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, 
540                 .psc_num_threads = SEQ_NUM_THREADS
541         };
542
543         LASSERT(dev != NULL);
544         LASSERT(cli != NULL);
545         
546         LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
547                          LUSTRE_SRV_SEQ_REGULAR));
548
549         seq->seq_dev = dev;
550         seq->seq_cli = cli;
551         seq->seq_flags = flags;
552         sema_init(&seq->seq_sem, 1);
553
554         lu_device_get(&seq->seq_dev->dd_lu_dev);
555
556         /* request backing store for saved sequence info */
557         rc = seq_server_read_state(seq, ctx);
558         if (rc == -ENODATA) {
559                 /* first run, no state on disk, init all seqs */
560                 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
561                         /* init super seq by start values on sequence-controller
562                          * node.*/
563                         seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
564                 } else {
565                         /* take super-seq from client seq mgr */
566                         LASSERT(range_is_sane(&cli->seq_cl_range));
567                         seq->seq_ss_range = cli->seq_cl_range;
568                 }
569
570                 /* init meta-sequence by start values and get ready for
571                  * allocating it for clients. */
572                 seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
573
574                 /* save init seq to backing store. */
575                 rc = seq_server_write_state(seq, ctx);
576                 if (rc) {
577                         CERROR("can't write sequence state, "
578                                "rc = %d\n", rc);
579                         GOTO(out, rc);
580                 }
581         } else if (rc) {
582                 CERROR("can't read sequence state, rc = %d\n",
583                        rc);
584                 GOTO(out, rc);
585         }
586
587         seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
588                                                  seq_req_handle,
589                                                  LUSTRE_SEQ0_NAME,
590                                                  seq->seq_proc_entry, 
591                                                  NULL); 
592         if (seq->seq_service != NULL)
593                 rc = ptlrpc_start_threads(NULL, seq->seq_service,
594                                           LUSTRE_SEQ0_NAME); 
595         else 
596                 rc = -ENOMEM; 
597
598         EXIT;
599
600 out:
601         if (rc)
602                 seq_server_fini(seq, ctx);
603         else
604                 CDEBUG(D_INFO, "Server Sequence Manager initialized\n");
605         return rc;
606
607 EXPORT_SYMBOL(seq_server_init);
608
609 void
610 seq_server_fini(struct lu_server_seq *seq,
611                 const struct lu_context *ctx) 
612 {
613         int rc;
614
615         if (seq->seq_service != NULL) {
616                 ptlrpc_unregister_service(seq->seq_service);
617                 seq->seq_service = NULL;
618         }
619
620         if (seq->seq_dev != NULL) {
621                 rc = seq_server_write_state(seq, ctx);
622                 if (rc) {
623                         CERROR("can't save sequence state, "
624                                "rc = %d\n", rc);
625                 }
626                 lu_device_put(&seq->seq_dev->dd_lu_dev);
627                 seq->seq_dev = NULL;
628         }
629         
630         CDEBUG(D_INFO, "Server Sequence Manager finalized\n");
631 }
632 EXPORT_SYMBOL(seq_server_fini);
633
634 static int fid_init(void)
635 {
636         ENTRY;
637         CDEBUG(D_INFO, "Lustre Sequence Manager\n");
638         RETURN(0);
639 }
640
641 static int fid_fini(void)
642 {
643         ENTRY;
644         RETURN(0);
645 }
646
647 static int 
648 __init fid_mod_init(void) 
649
650 {
651         /* init caches if any */
652         fid_init();
653         return 0;
654 }
655
656 static void 
657 __exit fid_mod_exit(void) 
658 {
659         /* free caches if any */
660         fid_fini();
661         return;
662 }
663
664 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
665 MODULE_DESCRIPTION("Lustre FID Module");
666 MODULE_LICENSE("GPL");
667
668 cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);
669 #endif