Whamcloud - gitweb
- mostly changes according to Mike's CODEINSP;
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52  * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
54         (0x400),
55         ((__u64)~0ULL)
56 };
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
58
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
61         0,
62         0
63 };
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
65
66 /* assigns client to sequence controller node */
67 int seq_server_set_cli(struct lu_server_seq *seq,
68                        struct lu_client_seq *cli,
69                        const struct lu_context *ctx)
70 {
71         int rc = 0;
72         ENTRY;
73
74         if (cli == NULL) {
75                 CDEBUG(D_INFO|D_WARNING, "%s: detached "
76                        "sequence mgr client %s\n", seq->seq_name,
77                        cli->seq_exp->exp_client_uuid.uuid);
78                 seq->seq_cli = cli;
79                 RETURN(0);
80         }
81
82         if (seq->seq_cli) {
83                 CERROR("%s: sequence-controller is already "
84                        "assigned\n", seq->seq_name);
85                 RETURN(-EINVAL);
86         }
87
88         CDEBUG(D_INFO|D_WARNING, "%s: attached "
89                "sequence client %s\n", seq->seq_name,
90                cli->seq_exp->exp_client_uuid.uuid);
91
92         /* asking client for new range, assign that range to ->seq_super and
93          * write seq state to backing store should be atomic. */
94         down(&seq->seq_sem);
95
96         /* assign controller */
97         seq->seq_cli = cli;
98
99         /* get new range from controller only if super-sequence is not yet
100          * initialized from backing store or something else. */
101         if (range_is_zero(&seq->seq_super)) {
102                 rc = seq_client_alloc_super(cli);
103                 if (rc) {
104                         CERROR("can't allocate super-sequence, "
105                                "rc %d\n", rc);
106                         GOTO(out_up, rc);
107                 }
108
109                 /* take super-seq from client seq mgr */
110                 LASSERT(range_is_sane(&cli->seq_range));
111
112                 seq->seq_super = cli->seq_range;
113
114                 /* save init seq to backing store. */
115                 rc = seq_store_write(seq, ctx);
116                 if (rc) {
117                         CERROR("can't write sequence state, "
118                                "rc = %d\n", rc);
119                 }
120         }
121
122         EXIT;
123 out_up:
124         up(&seq->seq_sem);
125         return rc;
126 }
127 EXPORT_SYMBOL(seq_server_set_cli);
128
129 /* on controller node, allocate new super sequence for regular sequence
130  * server. */
131 static int __seq_server_alloc_super(struct lu_server_seq *seq,
132                                     struct lu_range *range,
133                                     const struct lu_context *ctx)
134 {
135         struct lu_range *space = &seq->seq_space;
136         int rc;
137         ENTRY;
138
139         LASSERT(range_is_sane(space));
140
141         if (range_space(space) < seq->seq_super_width) {
142                 CWARN("sequences space is going to exhaust soon. "
143                       "Only can allocate "LPU64" sequences\n",
144                       space->lr_end - space->lr_start);
145                 *range = *space;
146                 space->lr_start = space->lr_end;
147                 rc = 0;
148         } else if (range_is_exhausted(space)) {
149                 CERROR("sequences space is exhausted\n");
150                 rc = -ENOSPC;
151         } else {
152                 range_alloc(range, space, seq->seq_super_width);
153                 rc = 0;
154         }
155
156         if (rc == 0) {
157                 rc = seq_store_write(seq, ctx);
158                 if (rc) {
159                         CERROR("can't save state, rc = %d\n",
160                                rc);
161                 }
162         }
163
164         if (rc == 0) {
165                 CDEBUG(D_INFO, "%s: allocated super-sequence "
166                        "["LPX64"-"LPX64"]\n", seq->seq_name,
167                        range->lr_start, range->lr_end);
168         }
169
170         RETURN(rc);
171 }
172
173 static int seq_server_alloc_super(struct lu_server_seq *seq,
174                                   struct lu_range *range,
175                                   const struct lu_context *ctx)
176 {
177         int rc;
178         ENTRY;
179
180         down(&seq->seq_sem);
181         rc = __seq_server_alloc_super(seq, range, ctx);
182         up(&seq->seq_sem);
183
184         RETURN(rc);
185 }
186
187 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
188                                    struct lu_range *range,
189                                    const struct lu_context *ctx)
190 {
191         struct lu_range *super = &seq->seq_super;
192         int rc = 0;
193         ENTRY;
194
195         LASSERT(range_is_sane(super));
196
197         /* XXX: here we should avoid cascading RPCs using kind of async
198          * preallocation when meta-sequence is close to exhausting. */
199         if (range_is_exhausted(super)) {
200                 if (!seq->seq_cli) {
201                         CERROR("no seq-controller client is setup\n");
202                         RETURN(-EOPNOTSUPP);
203                 }
204
205                 rc = seq_client_alloc_super(seq->seq_cli);
206                 if (rc) {
207                         CERROR("can't allocate new super-sequence, "
208                                "rc %d\n", rc);
209                         RETURN(rc);
210                 }
211
212                 /* saving new range into allocation space. */
213                 *super = seq->seq_cli->seq_range;
214                 LASSERT(range_is_sane(super));
215         }
216         range_alloc(range, super, seq->seq_meta_width);
217
218         rc = seq_store_write(seq, ctx);
219         if (rc) {
220                 CERROR("can't save state, rc = %d\n",
221                        rc);
222         }
223
224         if (rc == 0) {
225                 CDEBUG(D_INFO, "%s: allocated meta-sequence "
226                        "["LPX64"-"LPX64"]\n", seq->seq_name,
227                        range->lr_start, range->lr_end);
228         }
229
230         RETURN(rc);
231 }
232
233 static int seq_server_alloc_meta(struct lu_server_seq *seq,
234                                  struct lu_range *range,
235                                  const struct lu_context *ctx)
236 {
237         int rc;
238         ENTRY;
239
240         down(&seq->seq_sem);
241         rc = __seq_server_alloc_meta(seq, range, ctx);
242         up(&seq->seq_sem);
243
244         RETURN(rc);
245 }
246
247 static int seq_req_handle0(const struct lu_context *ctx,
248                            struct ptlrpc_request *req)
249 {
250         int rep_buf_size[2] = { 0, };
251         struct req_capsule pill;
252         struct lu_site *site;
253         struct lu_range *out;
254         int rc = -EPROTO;
255         __u32 *opc;
256         ENTRY;
257
258         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
259         LASSERT(site != NULL);
260                         
261         req_capsule_init(&pill, req, RCL_SERVER,
262                          rep_buf_size);
263
264         req_capsule_set(&pill, &RQF_SEQ_QUERY);
265         req_capsule_pack(&pill);
266
267         opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
268         if (opc != NULL) {
269                 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
270                 if (out == NULL) {
271                         CERROR("can't get range buffer\n");
272                         GOTO(out_pill, rc= -EPROTO);
273                 }
274
275                 switch (*opc) {
276                 case SEQ_ALLOC_META:
277                         if (!site->ls_server_seq) {
278                                 CERROR("sequence-server is not initialized\n");
279                                 GOTO(out_pill, rc == -EINVAL);
280                         }
281                         rc = seq_server_alloc_meta(site->ls_server_seq, out, ctx);
282                         break;
283                 case SEQ_ALLOC_SUPER:
284                         if (!site->ls_control_seq) {
285                                 CERROR("sequence-controller is not initialized\n");
286                                 GOTO(out_pill, rc == -EINVAL);
287                         }
288                         rc = seq_server_alloc_super(site->ls_control_seq, out, ctx);
289                         break;
290                 default:
291                         CERROR("wrong opc 0x%x\n", *opc);
292                         break;
293                 }
294         } else {
295                 CERROR("cannot unpack client request\n");
296         }
297
298 out_pill:
299         EXIT;
300         req_capsule_fini(&pill);
301         return rc;
302 }
303
304 static int seq_req_handle(struct ptlrpc_request *req)
305 {
306         int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
307         const struct lu_context *ctx;
308         int rc = -EPROTO;
309         ENTRY;
310
311         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
312
313         ctx = req->rq_svc_thread->t_ctx;
314         LASSERT(ctx != NULL);
315         LASSERT(ctx->lc_thread == req->rq_svc_thread);
316         if (req->rq_reqmsg->opc == SEQ_QUERY) {
317                 if (req->rq_export != NULL) {
318                         rc = seq_req_handle0(ctx, req);
319                 } else {
320                         CERROR("Unconnected request\n");
321                         req->rq_status = -ENOTCONN;
322                 }
323         } else {
324                 CERROR("Wrong opcode: %d\n",
325                        req->rq_reqmsg->opc);
326                 req->rq_status = -ENOTSUPP;
327                 rc = ptlrpc_error(req);
328                 RETURN(rc);
329         }
330
331         target_send_reply(req, rc, fail);
332         RETURN(0);
333 }
334
335 #ifdef LPROCFS
336 static int seq_server_proc_init(struct lu_server_seq *seq)
337 {
338         int rc;
339         ENTRY;
340
341         seq->seq_proc_dir = lprocfs_register(seq->seq_name,
342                                              proc_lustre_root,
343                                              NULL, NULL);
344         if (IS_ERR(seq->seq_proc_dir)) {
345                 CERROR("LProcFS failed in seq-init\n");
346                 rc = PTR_ERR(seq->seq_proc_dir);
347                 GOTO(err, rc);
348         }
349
350         seq->seq_proc_entry = lprocfs_register("services",
351                                                seq->seq_proc_dir,
352                                                NULL, NULL);
353         if (IS_ERR(seq->seq_proc_entry)) {
354                 CERROR("LProcFS failed in seq-init\n");
355                 rc = PTR_ERR(seq->seq_proc_entry);
356                 GOTO(err_type, rc);
357         }
358
359         rc = lprocfs_add_vars(seq->seq_proc_dir,
360                               seq_server_proc_list, seq);
361         if (rc) {
362                 CERROR("can't init sequence manager "
363                        "proc, rc %d\n", rc);
364                 GOTO(err_entry, rc);
365         }
366
367         RETURN(0);
368
369 err_entry:
370         lprocfs_remove(seq->seq_proc_entry);
371 err_type:
372         lprocfs_remove(seq->seq_proc_dir);
373 err:
374         seq->seq_proc_dir = NULL;
375         seq->seq_proc_entry = NULL;
376         return rc;
377 }
378
379 static void seq_server_proc_fini(struct lu_server_seq *seq)
380 {
381         ENTRY;
382         if (seq->seq_proc_entry) {
383                 lprocfs_remove(seq->seq_proc_entry);
384                 seq->seq_proc_entry = NULL;
385         }
386
387         if (seq->seq_proc_dir) {
388                 lprocfs_remove(seq->seq_proc_dir);
389                 seq->seq_proc_dir = NULL;
390         }
391         EXIT;
392 }
393 #endif
394
395 int seq_server_init(struct lu_server_seq *seq,
396                     struct dt_device *dev,
397                     const char *uuid,
398                     lu_server_type_t type,
399                     const struct lu_context *ctx)
400 {
401         int rc, portal = (type == LUSTRE_SEQ_SRV) ?
402                 SEQ_SRV_PORTAL : SEQ_CTLR_PORTAL;
403
404         struct ptlrpc_service_conf seq_conf = {
405                 .psc_nbufs = MDS_NBUFS,
406                 .psc_bufsize = MDS_BUFSIZE,
407                 .psc_max_req_size = MDS_MAXREQSIZE,
408                 .psc_max_reply_size = MDS_MAXREPSIZE,
409                 .psc_req_portal = portal,
410                 .psc_rep_portal = MDC_REPLY_PORTAL,
411                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
412                 .psc_num_threads = SEQ_NUM_THREADS,
413                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
414         };
415         ENTRY;
416
417         LASSERT(dev != NULL);
418         LASSERT(uuid != NULL);
419
420         seq->seq_dev = dev;
421         seq->seq_cli = NULL;
422         seq->seq_type = type;
423         sema_init(&seq->seq_sem, 1);
424
425         seq->seq_super_width = LUSTRE_SEQ_SUPER_WIDTH;
426         seq->seq_meta_width = LUSTRE_SEQ_META_WIDTH;
427
428         snprintf(seq->seq_name, sizeof(seq->seq_name), "%s-%s-%s",
429                  LUSTRE_SEQ_NAME, (type == LUSTRE_SEQ_SRV ? "srv" : "ctl"),
430                  uuid);
431
432         seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
433         seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
434
435         lu_device_get(&seq->seq_dev->dd_lu_dev);
436
437         rc = seq_store_init(seq, ctx);
438         if (rc)
439                 GOTO(out, rc);
440
441         /* request backing store for saved sequence info */
442         rc = seq_store_read(seq, ctx);
443         if (rc == -ENODATA) {
444                 if (type == LUSTRE_SEQ_SRV) {
445                         CDEBUG(D_INFO|D_WARNING, "%s: no data on "
446                                "disk found, wait for controller "
447                                "attach\n", seq->seq_name);
448                 } else {
449                         CDEBUG(D_INFO|D_WARNING, "%s: no data on "
450                                "disk found, this is first controller "
451                                "run\n", seq->seq_name);
452                 }
453         } else if (rc) {
454                 CERROR("can't read sequence state, rc = %d\n",
455                        rc);
456                 GOTO(out, rc);
457         }
458
459 #ifdef LPROCFS
460         rc  = seq_server_proc_init(seq);
461         if (rc)
462                 GOTO(out, rc);
463 #endif
464
465         seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
466                                                  seq_req_handle,
467                                                  LUSTRE_SEQ_NAME,
468                                                  seq->seq_proc_entry,
469                                                  NULL);
470         if (seq->seq_service != NULL)
471                 rc = ptlrpc_start_threads(NULL, seq->seq_service,
472                                           LUSTRE_SEQ_NAME);
473         else
474                 rc = -ENOMEM;
475
476         EXIT;
477
478 out:
479         if (rc) {
480                 seq_server_fini(seq, ctx);
481         } else {
482                 CDEBUG(D_INFO|D_WARNING, "%s Sequence Manager\n",
483                        (type == LUSTRE_SEQ_SRV ? "Server" : "Controller"));
484         }
485         return rc;
486 }
487 EXPORT_SYMBOL(seq_server_init);
488
489 void seq_server_fini(struct lu_server_seq *seq,
490                      const struct lu_context *ctx)
491 {
492         ENTRY;
493
494         if (seq->seq_service != NULL) {
495                 ptlrpc_unregister_service(seq->seq_service);
496                 seq->seq_service = NULL;
497         }
498
499 #ifdef LPROCFS
500         seq_server_proc_fini(seq);
501 #endif
502
503         seq_store_fini(seq, ctx);
504
505         if (seq->seq_dev != NULL) {
506                 lu_device_put(&seq->seq_dev->dd_lu_dev);
507                 seq->seq_dev = NULL;
508         }
509
510         EXIT;
511 }
512 EXPORT_SYMBOL(seq_server_fini);
513
514 static int fid_init(void)
515 {
516         ENTRY;
517         RETURN(0);
518 }
519
520 static int fid_fini(void)
521 {
522         ENTRY;
523         RETURN(0);
524 }
525
526 static int __init fid_mod_init(void)
527 {
528         /* init caches if any */
529         fid_init();
530         return 0;
531 }
532
533 static void __exit fid_mod_exit(void)
534 {
535         /* free caches if any */
536         fid_fini();
537         return;
538 }
539
540 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
541 MODULE_DESCRIPTION("Lustre FID Module");
542 MODULE_LICENSE("GPL");
543
544 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
545 #endif