Whamcloud - gitweb
- sequence width on clients, super-width and meta-width on servers are dynamic now...
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52  * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
54         (0x400),
55         ((__u64)~0ULL)
56 };
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
58
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
61         0,
62         0
63 };
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
65
66 static int
67 seq_server_write_state(struct lu_server_seq *seq,
68                        const struct lu_context *ctx)
69 {
70         int rc = 0;
71         ENTRY;
72
73         /* XXX: here should be calling struct dt_device methods to write
74          * sequence state to backing store. */
75
76         RETURN(rc);
77 }
78
79 static int
80 seq_server_read_state(struct lu_server_seq *seq,
81                       const struct lu_context *ctx)
82 {
83         int rc = -ENODATA;
84         ENTRY;
85         
86         /* XXX: here should be calling struct dt_device methods to read the
87          * sequence state from backing store. */
88
89         RETURN(rc);
90 }
91
92 /* assigns client to sequence controller node */
93 int
94 seq_server_set_ctlr(struct lu_server_seq *seq,
95                     struct lu_client_seq *cli,
96                     const struct lu_context *ctx)
97 {
98         int rc = 0;
99         ENTRY;
100         
101         LASSERT(cli != NULL);
102
103         if (seq->seq_cli) {
104                 CERROR("SEQ-MGR(srv): sequence-controller "
105                        "is already assigned\n");
106                 RETURN(-EINVAL);
107         }
108
109         CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): assign "
110                "sequence controller client %s\n",
111                cli->seq_exp->exp_client_uuid.uuid);
112
113         down(&seq->seq_sem);
114         
115         /* assign controller */
116         seq->seq_cli = cli;
117
118         /* get new range from controller only if super-sequence is not yet
119          * initialized from backing store or something else. */
120         if (range_is_zero(&seq->seq_super)) {
121                 /* release sema to avoid deadlock for case we're asking our
122                  * selves. */
123                 up(&seq->seq_sem);
124                 rc = seq_client_alloc_super(cli);
125                 down(&seq->seq_sem);
126                 
127                 if (rc) {
128                         CERROR("can't allocate super-sequence, "
129                                "rc %d\n", rc);
130                         RETURN(rc);
131                 }
132
133                 /* take super-seq from client seq mgr */
134                 LASSERT(range_is_sane(&cli->seq_range));
135
136                 seq->seq_super = cli->seq_range;
137
138                 /* save init seq to backing store. */
139                 rc = seq_server_write_state(seq, ctx);
140                 if (rc) {
141                         CERROR("can't write sequence state, "
142                                "rc = %d\n", rc);
143                 }
144         }
145         up(&seq->seq_sem);
146         RETURN(rc);
147 }
148 EXPORT_SYMBOL(seq_server_set_ctlr);
149
150 /* on controller node, allocate new super sequence for regular sequnece
151  * server. */
152 static int
153 __seq_server_alloc_super(struct lu_server_seq *seq,
154                          struct lu_range *range,
155                          const struct lu_context *ctx)
156 {
157         struct lu_range *space = &seq->seq_space;
158         int rc;
159         ENTRY;
160
161         LASSERT(range_is_sane(space));
162         
163         if (range_space(space) < seq->seq_super_width) {
164                 CWARN("sequences space is going to exhaust soon. "
165                       "Only can allocate "LPU64" sequences\n",
166                       space->lr_end - space->lr_start);
167                 *range = *space;
168                 space->lr_start = space->lr_end;
169                 rc = 0;
170         } else if (range_is_exhausted(space)) {
171                 CERROR("sequences space is exhausted\n");
172                 rc = -ENOSPC;
173         } else {
174                 range_alloc(range, space, seq->seq_super_width);
175                 rc = 0;
176         }
177
178         rc = seq_server_write_state(seq, ctx);
179         if (rc) {
180                 CERROR("can't save state, rc = %d\n",
181                        rc);
182         }
183
184         if (rc == 0) {
185                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated super-sequence "
186                        "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
187         }
188         
189         RETURN(rc);
190 }
191
192 static int
193 seq_server_alloc_super(struct lu_server_seq *seq,
194                        struct lu_range *range,
195                        const struct lu_context *ctx)
196 {
197         int rc;
198         ENTRY;
199
200         down(&seq->seq_sem);
201         rc = __seq_server_alloc_super(seq, range, ctx);
202         up(&seq->seq_sem);
203         
204         RETURN(rc);
205 }
206
207 static int
208 __seq_server_alloc_meta(struct lu_server_seq *seq,
209                         struct lu_range *range,
210                         const struct lu_context *ctx)
211 {
212         struct lu_range *super = &seq->seq_super;
213         int rc = 0;
214         ENTRY;
215
216         LASSERT(range_is_sane(super));
217
218         /* XXX: here we should avoid cascading RPCs using kind of async
219          * preallocation when meta-sequence is close to exhausting. */
220         if (range_is_exhausted(super)) {
221                 if (!seq->seq_cli) {
222                         CERROR("no seq-controller client is setup\n");
223                         RETURN(-EOPNOTSUPP);
224                 }
225
226                 /* allocate new super-sequence. */
227                 up(&seq->seq_sem);
228                 rc = seq_client_alloc_super(seq->seq_cli);
229                 down(&seq->seq_sem);
230                 if (rc) {
231                         CERROR("can't allocate new super-sequence, "
232                                "rc %d\n", rc);
233                         RETURN(rc);
234                 }
235
236                 if (seq->seq_cli->seq_range.lr_start > super->lr_start) {
237                         /* saving new range into allocation space. */
238                         *super = seq->seq_cli->seq_range;
239                         LASSERT(range_is_sane(super));
240                 } else {
241                         /* XXX: race is catched, ignore what we have from
242                          * controller node. The only issue is that controller
243                          * node has now this super-sequence lost, what makes
244                          * sequences space smaller. */
245                         CWARN("SEQ-MGR(srv): race is cached, reject "
246                               "allocated super-sequence\n");
247                         RETURN(0);
248                 }
249         }
250         range_alloc(range, super, seq->seq_meta_width);
251
252         rc = seq_server_write_state(seq, ctx);
253         if (rc) {
254                 CERROR("can't save state, rc = %d\n",
255                        rc);
256         }
257
258         if (rc == 0) {
259                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated meta-sequence "
260                        "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
261         }
262
263         RETURN(rc);
264 }
265
266 static int
267 seq_server_alloc_meta(struct lu_server_seq *seq,
268                       struct lu_range *range,
269                       const struct lu_context *ctx)
270 {
271         int rc;
272         ENTRY;
273
274         down(&seq->seq_sem);
275         rc = __seq_server_alloc_meta(seq, range, ctx);
276         up(&seq->seq_sem);
277         
278         RETURN(rc);
279 }
280
281 static int
282 seq_server_handle(struct lu_server_seq *seq,
283                   const struct lu_context *ctx, 
284                   struct lu_range *range,
285                   __u32 opc)
286 {
287         int rc;
288         ENTRY;
289
290         switch (opc) {
291         case SEQ_ALLOC_SUPER:
292                 rc = seq_server_alloc_super(seq, range, ctx);
293                 break;
294         case SEQ_ALLOC_META:
295                 rc = seq_server_alloc_meta(seq, range, ctx);
296                 break;
297         default:
298                 rc = -EINVAL;
299                 break;
300         }
301
302         RETURN(rc);
303 }
304
305 static int
306 seq_req_handle0(const struct lu_context *ctx,
307                 struct lu_server_seq *seq, 
308                 struct ptlrpc_request *req) 
309 {
310         int rep_buf_size[2] = { 0, };
311         struct req_capsule pill;
312         struct lu_range *out;
313         int rc = -EPROTO;
314         __u32 *opc;
315         ENTRY;
316
317         req_capsule_init(&pill, req, RCL_SERVER,
318                          rep_buf_size);
319
320         req_capsule_set(&pill, &RQF_SEQ_QUERY);
321         req_capsule_pack(&pill);
322
323         opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
324         if (opc != NULL) {
325                 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
326                 if (out == NULL) {
327                         CERROR("can't get range buffer\n");
328                         GOTO(out_pill, rc= -EPROTO);
329                 }
330                 rc = seq_server_handle(seq, ctx, out, *opc);
331         } else {
332                 CERROR("cannot unpack client request\n");
333         }
334
335 out_pill:
336         EXIT;
337         req_capsule_fini(&pill);
338         return rc;
339 }
340
341 static int 
342 seq_req_handle(struct ptlrpc_request *req) 
343 {
344         int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
345         const struct lu_context *ctx;
346         struct lu_site    *site;
347         int rc = -EPROTO;
348         ENTRY;
349
350         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
351         
352         ctx = req->rq_svc_thread->t_ctx;
353         LASSERT(ctx != NULL);
354         LASSERT(ctx->lc_thread == req->rq_svc_thread);
355         if (req->rq_reqmsg->opc == SEQ_QUERY) {
356                 if (req->rq_export != NULL) {
357                         struct obd_device *obd;
358
359                         obd = req->rq_export->exp_obd;
360                         site = obd->obd_lu_dev->ld_site;
361                         LASSERT(site != NULL);
362                         
363                         rc = seq_req_handle0(ctx, site->ls_server_seq, req);
364                 } else {
365                         CERROR("Unconnected request\n");
366                         req->rq_status = -ENOTCONN;
367                         GOTO(out, rc = -ENOTCONN);
368                 }
369         } else {
370                 CERROR("Wrong opcode: %d\n",
371                        req->rq_reqmsg->opc);
372                 req->rq_status = -ENOTSUPP;
373                 rc = ptlrpc_error(req);
374                 RETURN(rc);
375         }
376
377         EXIT;
378 out:
379         target_send_reply(req, rc, fail);
380         return 0;
381
382
383 #ifdef LPROCFS
384 static int
385 seq_server_proc_init(struct lu_server_seq *seq)
386 {
387         int rc;
388         ENTRY;
389
390         seq->seq_proc_dir = lprocfs_register(seq->seq_name,
391                                              proc_lustre_root,
392                                              NULL, NULL);
393         if (IS_ERR(seq->seq_proc_dir)) {
394                 CERROR("LProcFS failed in seq-init\n");
395                 rc = PTR_ERR(seq->seq_proc_dir);
396                 GOTO(err, rc);
397         }
398
399         seq->seq_proc_entry = lprocfs_register("services",
400                                                seq->seq_proc_dir,
401                                                NULL, NULL);
402         if (IS_ERR(seq->seq_proc_entry)) {
403                 CERROR("LProcFS failed in seq-init\n");
404                 rc = PTR_ERR(seq->seq_proc_entry);
405                 GOTO(err_type, rc);
406         }
407
408         rc = lprocfs_add_vars(seq->seq_proc_dir,
409                               seq_server_proc_list, seq);
410         if (rc) {
411                 CERROR("can't init sequence manager "
412                        "proc, rc %d\n", rc);
413         }
414
415         RETURN(0);
416
417 err_type:
418         lprocfs_remove(seq->seq_proc_dir);
419 err:
420         seq->seq_proc_dir = NULL;
421         seq->seq_proc_entry = NULL;
422         return rc;
423 }
424
425 static void
426 seq_server_proc_fini(struct lu_server_seq *seq)
427 {
428         ENTRY;
429         if (seq->seq_proc_entry) {
430                 lprocfs_remove(seq->seq_proc_entry);
431                 seq->seq_proc_entry = NULL;
432         }
433
434         if (seq->seq_proc_dir) {
435                 lprocfs_remove(seq->seq_proc_dir);
436                 seq->seq_proc_dir = NULL;
437         }
438         EXIT;
439 }
440 #endif
441
442 int
443 seq_server_init(struct lu_server_seq *seq,
444                 struct dt_device *dev,
445                 const char *uuid,
446                 const struct lu_context *ctx) 
447 {
448         int rc; 
449         struct ptlrpc_service_conf seq_conf = { 
450                 .psc_nbufs = MDS_NBUFS, 
451                 .psc_bufsize = MDS_BUFSIZE, 
452                 .psc_max_req_size = MDS_MAXREQSIZE,
453                 .psc_max_reply_size = MDS_MAXREPSIZE,
454                 .psc_req_portal = MDS_SEQ_PORTAL,
455                 .psc_rep_portal = MDC_REPLY_PORTAL,
456                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, 
457                 .psc_num_threads = SEQ_NUM_THREADS
458         };
459         ENTRY;
460
461         LASSERT(dev != NULL);
462         LASSERT(uuid != NULL);
463
464         seq->seq_dev = dev;
465         seq->seq_cli = NULL;
466         sema_init(&seq->seq_sem, 1);
467
468         seq->seq_super_width = LUSTRE_SEQ_SUPER_WIDTH;
469         seq->seq_meta_width = LUSTRE_SEQ_META_WIDTH;
470
471         snprintf(seq->seq_name, sizeof(seq->seq_name),
472                  "%s-%s", LUSTRE_SEQ_NAME, uuid);
473         
474         seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
475         seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
476         
477         lu_device_get(&seq->seq_dev->dd_lu_dev);
478
479         /* request backing store for saved sequence info */
480         rc = seq_server_read_state(seq, ctx);
481         if (rc == -ENODATA) {
482                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): no data on "
483                        "disk found, waiting for controller assign\n");
484         } else if (rc) {
485                 CERROR("can't read sequence state, rc = %d\n",
486                        rc);
487                 GOTO(out, rc);
488         }
489         
490 #ifdef LPROCFS
491         rc  = seq_server_proc_init(seq);
492         if (rc)
493                 GOTO(out, rc);
494 #endif
495
496         seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
497                                                  seq_req_handle,
498                                                  LUSTRE_SEQ_NAME,
499                                                  seq->seq_proc_entry, 
500                                                  NULL); 
501         if (seq->seq_service != NULL)
502                 rc = ptlrpc_start_threads(NULL, seq->seq_service,
503                                           LUSTRE_SEQ_NAME); 
504         else 
505                 rc = -ENOMEM; 
506
507         EXIT;
508
509 out:
510         if (rc) {
511 #ifdef LPROCFS
512                 seq_server_proc_fini(seq);
513 #endif
514                 seq_server_fini(seq, ctx);
515         } else {
516                 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
517                        "Manager\n");
518         }
519         return rc;
520
521 EXPORT_SYMBOL(seq_server_init);
522
523 void
524 seq_server_fini(struct lu_server_seq *seq,
525                 const struct lu_context *ctx) 
526 {
527         ENTRY;
528
529         if (seq->seq_service != NULL) {
530                 ptlrpc_unregister_service(seq->seq_service);
531                 seq->seq_service = NULL;
532         }
533
534 #ifdef LPROCFS
535         seq_server_proc_fini(seq);
536 #endif
537
538         if (seq->seq_dev != NULL) {
539                 lu_device_put(&seq->seq_dev->dd_lu_dev);
540                 seq->seq_dev = NULL;
541         }
542         
543         CDEBUG(D_INFO|D_WARNING, "Server Sequence "
544                "Manager\n");
545         EXIT;
546 }
547 EXPORT_SYMBOL(seq_server_fini);
548
549 static int fid_init(void)
550 {
551         ENTRY;
552         RETURN(0);
553 }
554
555 static int fid_fini(void)
556 {
557         ENTRY;
558         RETURN(0);
559 }
560
561 static int 
562 __init fid_mod_init(void) 
563
564 {
565         /* init caches if any */
566         fid_init();
567         return 0;
568 }
569
570 static void 
571 __exit fid_mod_exit(void) 
572 {
573         /* free caches if any */
574         fid_fini();
575         return;
576 }
577
578 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
579 MODULE_DESCRIPTION("Lustre FID Module");
580 MODULE_LICENSE("GPL");
581
582 cfs_module(fid, "0.0.4", fid_mod_init, fid_mod_exit);
583 #endif