Whamcloud - gitweb
ec7f968bd64dd0f22258a90d9bf2fe9b0aeffb18
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* assigns client to sequence controller node */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53                        struct lu_client_seq *cli,
54                        const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         if (cli == NULL) {
60                 CDEBUG(D_INFO|D_WARNING, "%s: Detached "
61                        "sequence client %s\n", seq->lss_name,
62                        cli->lcs_name);
63                 seq->lss_cli = cli;
64                 RETURN(0);
65         }
66
67         if (seq->lss_cli) {
68                 CERROR("%s: Sequence-controller is already "
69                        "assigned\n", seq->lss_name);
70                 RETURN(-EINVAL);
71         }
72
73         CDEBUG(D_INFO|D_WARNING, "%s: Attached "
74                "sequence client %s\n", seq->lss_name,
75                cli->lcs_name);
76
77         /* asking client for new range, assign that range to ->seq_super and
78          * write seq state to backing store should be atomic. */
79         down(&seq->lss_sem);
80
81         /* assign controller */
82         seq->lss_cli = cli;
83
84         /* get new range from controller only if super-sequence is not yet
85          * initialized from backing store or something else. */
86         if (range_is_zero(&seq->lss_super)) {
87                 rc = seq_client_alloc_super(cli, env);
88                 if (rc) {
89                         up(&seq->lss_sem);
90                         CERROR("%s: Can't allocate super-sequence, "
91                                "rc %d\n", seq->lss_name, rc);
92                         RETURN(rc);
93                 }
94
95                 /* take super-seq from client seq mgr */
96                 LASSERT(range_is_sane(&cli->lcs_range));
97
98                 seq->lss_super = cli->lcs_range;
99
100                 /* save init seq to backing store. */
101                 rc = seq_store_write(seq, env);
102                 if (rc) {
103                         CERROR("%s: Can't write sequence state, "
104                                "rc = %d\n", seq->lss_name, rc);
105                 }
106         }
107
108         up(&seq->lss_sem);
109         RETURN(rc);
110 }
111 EXPORT_SYMBOL(seq_server_set_cli);
112
113 /* on controller node, allocate new super sequence for regular sequence
114  * server. */
115 static int __seq_server_alloc_super(struct lu_server_seq *seq,
116                                     struct lu_range *in,
117                                     struct lu_range *out,
118                                     const struct lu_env *env)
119 {
120         struct lu_range *space = &seq->lss_space;
121         int rc;
122         ENTRY;
123
124         LASSERT(range_is_sane(space));
125
126         if (in != NULL) {
127                 CDEBUG(D_INFO|D_WARNING, "%s: Recovery started. Use input "
128                        "(last allocated) range "DRANGE"\n", seq->lss_name,
129                        PRANGE(in));
130
131                 if (in->lr_start > space->lr_start)
132                         space->lr_start = in->lr_start;
133                 *out = *in;
134
135                 CDEBUG(D_INFO|D_WARNING, "%s: Recovery finished. Recovered "
136                        "space: "DRANGE"\n", seq->lss_name, PRANGE(space));
137         } else {
138                 if (range_space(space) < seq->lss_super_width) {
139                         CWARN("%s: Sequences space to be exhausted soon. "
140                               "Only "LPU64" sequences left\n", seq->lss_name,
141                               range_space(space));
142                         *out = *space;
143                         space->lr_start = space->lr_end;
144                 } else if (range_is_exhausted(space)) {
145                         CERROR("%s: Sequences space is exhausted\n",
146                                seq->lss_name);
147                         RETURN(-ENOSPC);
148                 } else {
149                         range_alloc(out, space, seq->lss_super_width);
150                 }
151         }
152
153         rc = seq_store_write(seq, env);
154         if (rc) {
155                 CERROR("%s: Can't save state, rc %d\n", seq->lss_name, rc);
156                 RETURN(rc);
157         }
158
159         CDEBUG(D_INFO, "%s: Allocated super-sequence "
160                DRANGE"\n", seq->lss_name, PRANGE(out));
161
162         RETURN(rc);
163 }
164
165 int seq_server_alloc_super(struct lu_server_seq *seq,
166                            struct lu_range *in,
167                            struct lu_range *out,
168                            const struct lu_env *env)
169 {
170         int rc;
171         ENTRY;
172
173         down(&seq->lss_sem);
174         rc = __seq_server_alloc_super(seq, in, out, env);
175         up(&seq->lss_sem);
176
177         RETURN(rc);
178 }
179
180 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
181                                    struct lu_range *in,
182                                    struct lu_range *out,
183                                    const struct lu_env *env)
184 {
185         struct lu_range *super = &seq->lss_super;
186         int rc = 0;
187         ENTRY;
188
189         LASSERT(range_is_sane(super));
190
191         /*
192          * This is recovery case. Adjust super range if input range looks like
193          * it is allocated from new super.
194          */
195         if (in != NULL) {
196                 CDEBUG(D_INFO|D_WARNING, "%s: Recovery started. Use input "
197                        "(last allocated) range "DRANGE"\n", seq->lss_name,
198                        PRANGE(in));
199
200                 if (range_is_exhausted(super)) {
201                         LASSERT(in->lr_start > super->lr_start);
202
203                         /*
204                          * Server cannot send to client empty range, this is why
205                          * we check here that range from client is "newer" than
206                          * exhausted super.
207                          */
208                         super->lr_start = in->lr_start;
209
210                         super->lr_end = super->lr_start +
211                                 LUSTRE_SEQ_SUPER_WIDTH;
212                 } else {
213                         /*
214                          * Update super start by start from client's range. End
215                          * should not be changed if range was not exhausted.
216                          */
217                         if (in->lr_start > super->lr_start)
218                                 super->lr_start = in->lr_start;
219                 }
220
221                 *out = *in;
222
223                 CDEBUG(D_INFO|D_WARNING, "%s: Recovery finished. Recovered "
224                        "super: "DRANGE"\n", seq->lss_name, PRANGE(super));
225         } else {
226                 /*
227                  * XXX: avoid cascading RPCs using kind of async preallocation
228                  * when meta-sequence is close to exhausting.
229                  */
230                 if (range_is_exhausted(super)) {
231                         if (!seq->lss_cli) {
232                                 CERROR("%s: No sequence controller client "
233                                        "is setup\n", seq->lss_name);
234                                 RETURN(-EOPNOTSUPP);
235                         }
236
237                         rc = seq_client_alloc_super(seq->lss_cli, env);
238                         if (rc) {
239                                 CERROR("%s: Can't allocate super-sequence, "
240                                        "rc %d\n", seq->lss_name, rc);
241                                 RETURN(rc);
242                         }
243
244                         /* saving new range into allocation space. */
245                         *super = seq->lss_cli->lcs_range;
246                         LASSERT(range_is_sane(super));
247                 }
248                 range_alloc(out, super, seq->lss_meta_width);
249         }
250
251         rc = seq_store_write(seq, env);
252         if (rc) {
253                 CERROR("%s: Can't save state, rc = %d\n",
254                        seq->lss_name, rc);
255         }
256
257         if (rc == 0) {
258                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
259                        DRANGE"\n", seq->lss_name, PRANGE(out));
260         }
261
262         RETURN(rc);
263 }
264
265 int seq_server_alloc_meta(struct lu_server_seq *seq,
266                           struct lu_range *in,
267                           struct lu_range *out,
268                           const struct lu_env *env)
269 {
270         int rc;
271         ENTRY;
272
273         down(&seq->lss_sem);
274         rc = __seq_server_alloc_meta(seq, in, out, env);
275         up(&seq->lss_sem);
276
277         RETURN(rc);
278 }
279
280 static int seq_server_handle(struct lu_site *site,
281                              const struct lu_env *env,
282                              __u32 opc, struct lu_range *in,
283                              struct lu_range *out)
284 {
285         int rc;
286         ENTRY;
287
288         switch (opc) {
289         case SEQ_ALLOC_META:
290                 if (!site->ls_server_seq) {
291                         CERROR("Sequence server is not "
292                                "initialized\n");
293                         RETURN(-EINVAL);
294                 }
295                 rc = seq_server_alloc_meta(site->ls_server_seq,
296                                            in, out, env);
297                 break;
298         case SEQ_ALLOC_SUPER:
299                 if (!site->ls_control_seq) {
300                         CERROR("Sequence-controller is not "
301                                "initialized\n");
302                         RETURN(-EINVAL);
303                 }
304                 rc = seq_server_alloc_super(site->ls_control_seq,
305                                             in, out, env);
306                 break;
307         default:
308                 rc = -EINVAL;
309                 break;
310         }
311
312         RETURN(rc);
313 }
314
315 static int seq_req_handle(struct ptlrpc_request *req, const struct lu_env *env,
316                           struct seq_thread_info *info)
317 {
318         struct lu_range *out, *in = NULL;
319         struct lu_site *site;
320         int rc = -EPROTO;
321         __u32 *opc;
322         ENTRY;
323
324         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
325         LASSERT(site != NULL);
326                         
327         rc = req_capsule_pack(&info->sti_pill);
328         if (rc)
329                 RETURN(rc);
330
331         opc = req_capsule_client_get(&info->sti_pill,
332                                      &RMF_SEQ_OPC);
333         if (opc != NULL) {
334                 out = req_capsule_server_get(&info->sti_pill,
335                                              &RMF_SEQ_RANGE);
336                 if (out == NULL)
337                         RETURN(-EPROTO);
338
339                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
340                         in = req_capsule_client_get(&info->sti_pill,
341                                                     &RMF_SEQ_RANGE);
342
343                         LASSERT(!range_is_zero(in) && range_is_sane(in));
344                 }
345
346                 rc = seq_server_handle(site, env, *opc, in, out);
347         } else
348                 rc = -EPROTO;
349
350         RETURN(rc);
351 }
352
353 static void *seq_key_init(const struct lu_context *ctx,
354                           struct lu_context_key *key)
355 {
356         struct seq_thread_info *info;
357
358         /*
359          * check that no high order allocations are incurred.
360          */
361         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
362         OBD_ALLOC_PTR(info);
363         if (info == NULL)
364                 info = ERR_PTR(-ENOMEM);
365         return info;
366 }
367
368 static void seq_key_fini(const struct lu_context *ctx,
369                          struct lu_context_key *key, void *data)
370 {
371         struct seq_thread_info *info = data;
372         OBD_FREE_PTR(info);
373 }
374
375 struct lu_context_key seq_thread_key = {
376         .lct_tags = LCT_MD_THREAD,
377         .lct_init = seq_key_init,
378         .lct_fini = seq_key_fini
379 };
380
381 static void seq_thread_info_init(struct ptlrpc_request *req,
382                                  struct seq_thread_info *info)
383 {
384         int i;
385
386         /* mark rep buffer as req-layout stuff expects */
387         for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
388                 info->sti_rep_buf_size[i] = -1;
389
390         /* init request capsule */
391         req_capsule_init(&info->sti_pill, req, RCL_SERVER,
392                          info->sti_rep_buf_size);
393
394         req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
395 }
396
397 static void seq_thread_info_fini(struct seq_thread_info *info)
398 {
399         req_capsule_fini(&info->sti_pill);
400 }
401
402 static int seq_handle(struct ptlrpc_request *req)
403 {
404         const struct lu_env *env;
405         struct seq_thread_info *info;
406         int rc;
407
408         env = req->rq_svc_thread->t_env;
409         LASSERT(env != NULL);
410
411         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
412         LASSERT(info != NULL);
413
414         seq_thread_info_init(req, info);
415         rc = seq_req_handle(req, env, info);
416         seq_thread_info_fini(info);
417
418         return rc;
419 }
420
421 /*
422  * Entry point for handling FLD RPCs called from MDT.
423  */
424 int seq_query(struct com_thread_info *info)
425 {
426         return seq_handle(info->cti_pill.rc_req);
427 }
428 EXPORT_SYMBOL(seq_query);
429
430 static void seq_server_proc_fini(struct lu_server_seq *seq);
431
432 #ifdef LPROCFS
433 static int seq_server_proc_init(struct lu_server_seq *seq)
434 {
435         int rc;
436         ENTRY;
437
438         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
439                                              seq_type_proc_dir,
440                                              NULL, NULL);
441         if (IS_ERR(seq->lss_proc_dir)) {
442                 rc = PTR_ERR(seq->lss_proc_dir);
443                 RETURN(rc);
444         }
445
446         rc = lprocfs_add_vars(seq->lss_proc_dir,
447                               seq_server_proc_list, seq);
448         if (rc) {
449                 CERROR("%s: Can't init sequence manager "
450                        "proc, rc %d\n", seq->lss_name, rc);
451                 GOTO(out_cleanup, rc);
452         }
453
454         RETURN(0);
455
456 out_cleanup:
457         seq_server_proc_fini(seq);
458         return rc;
459 }
460
461 static void seq_server_proc_fini(struct lu_server_seq *seq)
462 {
463         ENTRY;
464         if (seq->lss_proc_dir != NULL) {
465                 if (!IS_ERR(seq->lss_proc_dir))
466                         lprocfs_remove(seq->lss_proc_dir);
467                 seq->lss_proc_dir = NULL;
468         }
469         EXIT;
470 }
471 #else
472 static int seq_server_proc_init(struct lu_server_seq *seq)
473 {
474         return 0;
475 }
476
477 static void seq_server_proc_fini(struct lu_server_seq *seq)
478 {
479         return;
480 }
481 #endif
482
483 int seq_server_init(struct lu_server_seq *seq,
484                     struct dt_device *dev,
485                     const char *prefix,
486                     enum lu_mgr_type type,
487                     const struct lu_env *env)
488 {
489         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
490         ENTRY;
491
492         LASSERT(dev != NULL);
493         LASSERT(prefix != NULL);
494
495         seq->lss_cli = NULL;
496         seq->lss_type = type;
497         sema_init(&seq->lss_sem, 1);
498
499         seq->lss_super_width = LUSTRE_SEQ_SUPER_WIDTH;
500         seq->lss_meta_width = LUSTRE_SEQ_META_WIDTH;
501
502         snprintf(seq->lss_name, sizeof(seq->lss_name),
503                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
504
505         seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
506         seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
507
508         rc = seq_store_init(seq, env, dev);
509         if (rc)
510                 GOTO(out, rc);
511
512         /* request backing store for saved sequence info */
513         rc = seq_store_read(seq, env);
514         if (rc == -ENODATA) {
515                 CDEBUG(D_INFO|D_WARNING, "%s: No data found "
516                        "on storage, %s\n", seq->lss_name,
517                        is_srv ? "wait for controller attach" :
518                        "this is first controller run");
519         } else if (rc) {
520                 CERROR("%s: Can't read sequence state, rc %d\n",
521                        seq->lss_name, rc);
522                 GOTO(out, rc);
523         }
524
525         rc  = seq_server_proc_init(seq);
526         if (rc)
527                 GOTO(out, rc);
528
529         EXIT;
530 out:
531         if (rc)
532                 seq_server_fini(seq, env);
533         return rc;
534 }
535 EXPORT_SYMBOL(seq_server_init);
536
537 void seq_server_fini(struct lu_server_seq *seq,
538                      const struct lu_env *env)
539 {
540         ENTRY;
541
542         seq_server_proc_fini(seq);
543         seq_store_fini(seq, env);
544
545         EXIT;
546 }
547 EXPORT_SYMBOL(seq_server_fini);
548
549 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
550
551 static int __init fid_mod_init(void)
552 {
553         printk(KERN_INFO "Lustre: Sequence Manager; "
554                "info@clusterfs.com\n");
555
556         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
557                                              proc_lustre_root,
558                                              NULL, NULL);
559         if (IS_ERR(seq_type_proc_dir))
560                 return PTR_ERR(seq_type_proc_dir);
561
562         lu_context_key_register(&seq_thread_key);
563         return 0;
564 }
565
566 static void __exit fid_mod_exit(void)
567 {
568         lu_context_key_degister(&seq_thread_key);
569         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
570                 lprocfs_remove(seq_type_proc_dir);
571                 seq_type_proc_dir = NULL;
572         }
573 }
574
575 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
576 MODULE_DESCRIPTION("Lustre FID Module");
577 MODULE_LICENSE("GPL");
578
579 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
580 #endif