Whamcloud - gitweb
- cleanups in FLD, added client API and server API
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_fid.h>
47 #include "fid_internal.h"
48
49 /* client seq mgr interface */
50 static int 
51 seq_client_alloc_common(struct lu_client_seq *seq, 
52                         struct lu_range *seq_ran,
53                         __u32 seq_op)
54 {
55         __u32 *op;
56         struct lu_range *range;
57         struct ptlrpc_request *req;
58         int ran_size = sizeof(*range);
59         int rc, size[] = {sizeof(*op), ran_size};
60         int repsize[] = {ran_size};
61         ENTRY;
62
63         req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
64                               LUSTRE_MDS_VERSION, SEQ_QUERY,
65                               2, size, NULL);
66         if (req == NULL)
67                 RETURN(-ENOMEM);
68
69         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
70         *op = seq_op;
71
72         range = lustre_msg_buf(req->rq_reqmsg, 1, ran_size);
73         *range = *seq_ran;
74
75         req->rq_replen = lustre_msg_size(1, repsize);
76         req->rq_request_portal = MDS_SEQ_PORTAL;
77         rc = ptlrpc_queue_wait(req);
78         if (rc)
79                 GOTO(out_req, rc);
80
81         range = lustre_swab_repbuf(req, 0, sizeof(*range),
82                                    lustre_swab_lu_range);
83
84         LASSERT(range != NULL);
85         *seq_ran = *range;
86 out_req:
87         ptlrpc_req_finished(req); 
88         RETURN(rc);
89 }
90
91 /* request sequence-controller node to allocate new super-sequence. */
92 int
93 seq_client_alloc_super(struct lu_client_seq *seq)
94 {
95         int rc;
96         ENTRY;
97
98         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
99         rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
100                                      SEQ_ALLOC_SUPER);
101         if (rc == 0) {
102                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated super-sequence "
103                        "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
104                        seq->seq_cl_range.lr_end);
105         }
106         RETURN(rc);
107 }
108 EXPORT_SYMBOL(seq_client_alloc_super);
109
110 /* request sequence-controller node to allocate new meta-sequence. */
111 int
112 seq_client_alloc_meta(struct lu_client_seq *seq)
113 {
114         int rc;
115         ENTRY;
116
117         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
118         rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
119                                      SEQ_ALLOC_META);
120         if (rc == 0) {
121                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated meta-sequence "
122                        "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
123                        seq->seq_cl_range.lr_end);
124         }
125         RETURN(rc);
126 }
127 EXPORT_SYMBOL(seq_client_alloc_meta);
128
129 /* allocate new sequence for client (llite or MDC are expected to use this) */
130 int
131 seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
132 {
133         int rc;
134         ENTRY;
135
136         down(&seq->seq_sem);
137
138         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
139         LASSERT(range_is_sane(&seq->seq_cl_range));
140
141         /* if we still have free sequences in meta-sequence we allocate new seq
142          * from given range. */
143         if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
144                 *seqnr = seq->seq_cl_range.lr_start;
145                 seq->seq_cl_range.lr_start += 1;
146                 rc = 0;
147         } else {
148                 /* meta-sequence is exhausted, request MDT to allocate new
149                  * meta-sequence for us. */
150                 rc = seq_client_alloc_meta(seq);
151                 if (rc) {
152                         CERROR("can't allocate new meta-sequence, "
153                                "rc %d\n", rc);
154                 }
155                 
156                 *seqnr = seq->seq_cl_range.lr_start;
157                 seq->seq_cl_range.lr_start += 1;
158         }
159         up(&seq->seq_sem);
160
161         if (rc == 0) {
162                 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated sequence "
163                        "["LPX64"]\n", *seqnr);
164         }
165         RETURN(rc);
166 }
167 EXPORT_SYMBOL(seq_client_alloc_seq);
168
169 int
170 seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
171 {
172         int rc;
173         ENTRY;
174
175         LASSERT(fid != NULL);
176         LASSERT(fid_is_sane(&seq->seq_fid));
177         LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
178
179         down(&seq->seq_sem);
180         if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
181                 *fid = seq->seq_fid;
182                 seq->seq_fid.f_oid += 1;
183                 rc = 0;
184         } else {
185                 __u64 seqnr = 0;
186                 
187                 rc = seq_client_alloc_seq(seq, &seqnr);
188                 if (rc) {
189                         CERROR("can't allocate new sequence, "
190                                "rc %d\n", rc);
191                         GOTO(out, rc);
192                 } else {
193                         seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
194                         seq->seq_fid.f_seq = seqnr;
195                         seq->seq_fid.f_ver = 0;
196                         
197                         *fid = seq->seq_fid;
198                         seq->seq_fid.f_oid += 1;
199                 }
200         }
201         LASSERT(fid_is_sane(fid));
202         
203         CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
204                PFID3(fid));
205
206         EXIT;
207 out:
208         up(&seq->seq_sem);
209         return rc;
210 }
211 EXPORT_SYMBOL(seq_client_alloc_fid);
212
213 int 
214 seq_client_init(struct lu_client_seq *seq, 
215                 struct obd_export *exp,
216                 int flags)
217 {
218         int rc;
219         ENTRY;
220
221         LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
222                          LUSTRE_CLI_SEQ_SERVER));
223
224         seq->seq_flags = flags;
225         fid_zero(&seq->seq_fid);
226         sema_init(&seq->seq_sem, 1);
227         
228         seq->seq_cl_range.lr_end = 0;
229         seq->seq_cl_range.lr_start = 0;
230         
231         if (exp != NULL)
232                 seq->seq_exp = class_export_get(exp);
233
234         if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
235                 __u64 seqnr = 0;
236                 
237                 /* client (llite or MDC) init case, we need new sequence from
238                  * MDT. This will allocate new meta-sequemce first, because seq
239                  * range in init state and looks the same as exhausted. */
240                 rc = seq_client_alloc_seq(seq, &seqnr);
241                 if (rc) {
242                         CERROR("can't allocate new sequence, rc %d\n", rc);
243                         GOTO(out, rc);
244                 } else {
245                         seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
246                         seq->seq_fid.f_seq = seqnr;
247                         seq->seq_fid.f_ver = 0;
248                 }
249
250                 LASSERT(fid_is_sane(&seq->seq_fid));
251         } else {
252                 /* check if this is controller node is trying to init client. */
253                 if (seq->seq_exp) {
254                         /* MDT uses client seq manager to talk to sequence
255                          * controller, and thus, we need super-sequence. */
256                         rc = seq_client_alloc_super(seq);
257                 } else {
258                         rc = 0;
259                 }
260         }
261
262         EXIT;
263 out:
264         if (rc)
265                 seq_client_fini(seq);
266         else
267                 CDEBUG(D_INFO, "Client Sequence Manager initialized\n");
268         return rc;
269 }
270 EXPORT_SYMBOL(seq_client_init);
271
272 void seq_client_fini(struct lu_client_seq *seq)
273 {
274         ENTRY;
275         if (seq->seq_exp != NULL) {
276                 class_export_put(seq->seq_exp);
277                 seq->seq_exp = NULL;
278         }
279         CDEBUG(D_INFO, "Client Sequence Manager finalized\n");
280         EXIT;
281 }
282 EXPORT_SYMBOL(seq_client_fini);
283
284 #ifdef __KERNEL__
285 /* server side seq mgr stuff */
286 static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
287         LUSTRE_SEQ_SPACE_START,
288         LUSTRE_SEQ_SPACE_LIMIT
289 };
290
291 static const struct lu_range LUSTRE_SEQ_META_INIT = {
292         0,
293         0
294 };
295
296 static int
297 seq_server_write_state(struct lu_server_seq *seq,
298                        const struct lu_context *ctx)
299 {
300         int rc = 0;
301         ENTRY;
302
303         /* XXX: here should be calling struct dt_device methods to write
304          * sequence state to backing store. */
305
306         RETURN(rc);
307 }
308
309 static int
310 seq_server_read_state(struct lu_server_seq *seq,
311                       const struct lu_context *ctx)
312 {
313         int rc = -ENODATA;
314         ENTRY;
315         
316         /* XXX: here should be calling struct dt_device methods to read the
317          * sequence state from backing store. */
318
319         RETURN(rc);
320 }
321
322 static int
323 seq_server_alloc_super(struct lu_server_seq *seq,
324                        struct lu_range *range)
325 {
326         struct lu_range *ss_range = &seq->seq_ss_range;
327         int rc;
328         ENTRY;
329
330         if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
331                 CWARN("super-sequence is going to exhauste soon. "
332                       "Only can allocate "LPU64" sequences\n",
333                       ss_range->lr_end - ss_range->lr_start);
334                 *range = *ss_range;
335                 ss_range->lr_start = ss_range->lr_end;
336                 rc = 0;
337         } else if (ss_range->lr_start >= ss_range->lr_end) {
338                 CERROR("super-sequence is exhausted\n");
339                 rc = -ENOSPC;
340         } else {
341                 range->lr_start = ss_range->lr_start;
342                 ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
343                 range->lr_end = ss_range->lr_start;
344                 rc = 0;
345         }
346
347         if (rc == 0) {
348                 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated super-sequence "
349                        "["LPX64"-"LPX64"]\n", range->lr_start,
350                        range->lr_end);
351         }
352         
353         RETURN(rc);
354 }
355
356 static int
357 seq_server_alloc_meta(struct lu_server_seq *seq,
358                       struct lu_range *range)
359 {
360         struct lu_range *ms_range = &seq->seq_ms_range;
361         int rc;
362         ENTRY;
363
364         LASSERT(range_is_sane(ms_range));
365         
366         /* XXX: here should avoid cascading RPCs using kind of async
367          * preallocation when meta-sequence is close to exhausting. */
368         if (ms_range->lr_start == ms_range->lr_end) {
369                 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
370                         /* allocate new range of meta-sequences to allocate new
371                          * meta-sequence from it. */
372                         rc = seq_server_alloc_super(seq, ms_range);
373                 } else {
374                         /* request controller to allocate new super-sequence for
375                          * us.*/
376                         rc = seq_client_alloc_super(seq->seq_cli);
377                         if (rc) {
378                                 CERROR("can't allocate new super-sequence, "
379                                        "rc %d\n", rc);
380                                 RETURN(rc);
381                         }
382
383                         /* saving new range into allocation space. */
384                         *ms_range = seq->seq_cli->seq_cl_range;
385                 }
386
387                 LASSERT(ms_range->lr_start != 0);
388                 LASSERT(ms_range->lr_end > ms_range->lr_start);
389         } else {
390                 rc = 0;
391         }
392         range->lr_start = ms_range->lr_start;
393         ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
394         range->lr_end = ms_range->lr_start;
395
396         if (rc == 0) {
397                 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated meta-sequence "
398                        "["LPX64"-"LPX64"]\n", range->lr_start,
399                        range->lr_end);
400         }
401
402         RETURN(rc);
403 }
404
405 static int
406 seq_server_handle(struct lu_server_seq *seq,
407                   const struct lu_context *ctx, 
408                   struct lu_range *range,
409                   __u32 opc)
410 {
411         int rc;
412         ENTRY;
413
414         down(&seq->seq_sem);
415
416         switch (opc) {
417         case SEQ_ALLOC_SUPER:
418                 rc = seq_server_alloc_super(seq, range);
419                 break;
420         case SEQ_ALLOC_META:
421                 rc = seq_server_alloc_meta(seq, range);
422                 break;
423         default:
424                 rc = -EINVAL;
425                 break;
426         }
427
428         if (rc)
429                 GOTO(out, rc);
430
431         rc = seq_server_write_state(seq, ctx);
432         if (rc) {
433                 CERROR("can't save state, rc = %d\n",
434                        rc);
435         }
436
437         EXIT;
438 out:
439         up(&seq->seq_sem);
440         return rc;
441 }
442
443 static int
444 seq_req_handle0(const struct lu_context *ctx,
445                 struct lu_server_seq *seq, 
446                 struct ptlrpc_request *req) 
447 {
448         struct lu_range *in;
449         struct lu_range *out;
450         int size = sizeof(*in);
451         __u32 *opt;
452         int rc;
453         ENTRY;
454
455         rc = lustre_pack_reply(req, 1, &size, NULL);
456         if (rc)
457                 RETURN(rc);
458
459         rc = -EPROTO;
460         opt = lustre_swab_reqbuf(req, 0, sizeof(*opt),
461                                  lustre_swab_generic_32s);
462         if (opt != NULL) {
463                 in = lustre_swab_reqbuf(req, 1, sizeof(*in),
464                                         lustre_swab_lu_range);
465                 if (in != NULL) {
466                         out = lustre_msg_buf(req->rq_repmsg, 
467                                              0, sizeof(*out));
468                         LASSERT(out != NULL);
469
470                         *out = *in;
471                         rc = seq_server_handle(seq, ctx, out, *opt);
472                 } else {
473                         CERROR("Cannot unpack seq range\n");
474                 }
475         } else {
476                 CERROR("Cannot unpack option\n");
477         }
478         RETURN(rc);
479 }
480
481 static int 
482 seq_req_handle(struct ptlrpc_request *req) 
483 {
484         int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
485         const struct lu_context *ctx;
486         struct lu_site    *site;
487         int rc = -EPROTO;
488         ENTRY;
489
490         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
491         
492         ctx = req->rq_svc_thread->t_ctx;
493         LASSERT(ctx != NULL);
494         LASSERT(ctx->lc_thread == req->rq_svc_thread);
495         if (req->rq_reqmsg->opc == SEQ_QUERY) {
496                 if (req->rq_export != NULL) {
497                         struct obd_device *obd;
498
499                         obd = req->rq_export->exp_obd;
500                         site = obd->obd_lu_dev->ld_site;
501                         LASSERT(site != NULL);
502                         
503                         rc = seq_req_handle0(ctx, site->ls_server_seq, req);
504                 } else {
505                         CERROR("Unconnected request\n");
506                         req->rq_status = -ENOTCONN;
507                         GOTO(out, rc = -ENOTCONN);
508                 }
509         } else {
510                 CERROR("Wrong opcode: %d\n",
511                        req->rq_reqmsg->opc);
512                 req->rq_status = -ENOTSUPP;
513                 rc = ptlrpc_error(req);
514                 RETURN(rc);
515         }
516
517         EXIT;
518 out:
519         target_send_reply(req, rc, fail);
520         return 0;
521
522
523 int
524 seq_server_init(struct lu_server_seq *seq,
525                 struct lu_client_seq *cli,
526                 const struct lu_context  *ctx,
527                 struct dt_device *dev,
528                 int flags) 
529 {
530         int rc; 
531         ENTRY;
532
533         struct ptlrpc_service_conf seq_conf = { 
534                 .psc_nbufs = MDS_NBUFS, 
535                 .psc_bufsize = MDS_BUFSIZE, 
536                 .psc_max_req_size = MDS_MAXREQSIZE,
537                 .psc_max_reply_size = MDS_MAXREPSIZE,
538                 .psc_req_portal = MDS_SEQ_PORTAL,
539                 .psc_rep_portal = MDC_REPLY_PORTAL,
540                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, 
541                 .psc_num_threads = SEQ_NUM_THREADS
542         };
543
544         LASSERT(dev != NULL);
545         LASSERT(cli != NULL);
546         
547         LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
548                          LUSTRE_SRV_SEQ_REGULAR));
549
550         seq->seq_dev = dev;
551         seq->seq_cli = cli;
552         seq->seq_flags = flags;
553         sema_init(&seq->seq_sem, 1);
554
555         lu_device_get(&seq->seq_dev->dd_lu_dev);
556
557         /* request backing store for saved sequence info */
558         rc = seq_server_read_state(seq, ctx);
559         if (rc == -ENODATA) {
560                 /* first run, no state on disk, init all seqs */
561                 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
562                         /* init super seq by start values on sequence-controller
563                          * node.*/
564                         seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
565                 } else {
566                         /* take super-seq from client seq mgr */
567                         LASSERT(range_is_sane(&cli->seq_cl_range));
568                         seq->seq_ss_range = cli->seq_cl_range;
569                 }
570
571                 /* init meta-sequence by start values and get ready for
572                  * allocating it for clients. */
573                 seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
574
575                 /* save init seq to backing store. */
576                 rc = seq_server_write_state(seq, ctx);
577                 if (rc) {
578                         CERROR("can't write sequence state, "
579                                "rc = %d\n", rc);
580                         GOTO(out, rc);
581                 }
582         } else if (rc) {
583                 CERROR("can't read sequence state, rc = %d\n",
584                        rc);
585                 GOTO(out, rc);
586         }
587
588         seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
589                                                  seq_req_handle,
590                                                  LUSTRE_SEQ0_NAME,
591                                                  seq->seq_proc_entry, 
592                                                  NULL); 
593         if (seq->seq_service != NULL)
594                 rc = ptlrpc_start_threads(NULL, seq->seq_service,
595                                           LUSTRE_SEQ0_NAME); 
596         else 
597                 rc = -ENOMEM; 
598
599         EXIT;
600
601 out:
602         if (rc)
603                 seq_server_fini(seq, ctx);
604         else
605                 CDEBUG(D_INFO, "Server Sequence Manager initialized\n");
606         return rc;
607
608 EXPORT_SYMBOL(seq_server_init);
609
610 void
611 seq_server_fini(struct lu_server_seq *seq,
612                 const struct lu_context *ctx) 
613 {
614         int rc;
615
616         if (seq->seq_service != NULL) {
617                 ptlrpc_unregister_service(seq->seq_service);
618                 seq->seq_service = NULL;
619         }
620
621         if (seq->seq_dev != NULL) {
622                 rc = seq_server_write_state(seq, ctx);
623                 if (rc) {
624                         CERROR("can't save sequence state, "
625                                "rc = %d\n", rc);
626                 }
627                 lu_device_put(&seq->seq_dev->dd_lu_dev);
628                 seq->seq_dev = NULL;
629         }
630         
631         CDEBUG(D_INFO, "Server Sequence Manager finalized\n");
632 }
633 EXPORT_SYMBOL(seq_server_fini);
634
635 static int fid_init(void)
636 {
637         ENTRY;
638         CDEBUG(D_INFO, "Lustre Sequence Manager\n");
639         RETURN(0);
640 }
641
642 static int fid_fini(void)
643 {
644         ENTRY;
645         RETURN(0);
646 }
647
648 static int 
649 __init fid_mod_init(void) 
650
651 {
652         /* init caches if any */
653         fid_init();
654         return 0;
655 }
656
657 static void 
658 __exit fid_mod_exit(void) 
659 {
660         /* free caches if any */
661         fid_fini();
662         return;
663 }
664
665 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
666 MODULE_DESCRIPTION("Lustre FID Module");
667 MODULE_LICENSE("GPL");
668
669 cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);
670 #endif