Whamcloud - gitweb
some code cleanup in split.
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* Assigns client to sequence controller node. */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53                        struct lu_client_seq *cli,
54                        const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         /*
60          * Ask client for new range, assign that range to ->seq_space and write
61          * seq state to backing store should be atomic.
62          */
63         down(&seq->lss_sem);
64
65         if (cli == NULL) {
66                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
67                        seq->lss_name, cli->lcs_name);
68                 seq->lss_cli = cli;
69                 GOTO(out_up, rc = 0);
70         }
71
72         if (seq->lss_cli != NULL) {
73                 CERROR("%s: Sequence controller is already "
74                        "assigned\n", seq->lss_name);
75                 GOTO(out_up, rc = -EINVAL);
76         }
77
78         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
79                seq->lss_name, cli->lcs_name);
80
81         seq->lss_cli = cli;
82         EXIT;
83 out_up:
84         up(&seq->lss_sem);
85         return rc;
86 }
87 EXPORT_SYMBOL(seq_server_set_cli);
88
89 /*
90  * On controller node, allocate new super sequence for regular sequence server.
91  */
92 static int __seq_server_alloc_super(struct lu_server_seq *seq,
93                                     struct lu_range *in,
94                                     struct lu_range *out,
95                                     const struct lu_env *env)
96 {
97         struct lu_range *space = &seq->lss_space;
98         int rc;
99         ENTRY;
100
101         LASSERT(range_is_sane(space));
102
103         if (in != NULL) {
104                 CDEBUG(D_INFO, "%s: Input seq range: "
105                        DRANGE"\n", seq->lss_name, PRANGE(in));
106
107                 if (in->lr_end > space->lr_start)
108                         space->lr_start = in->lr_end;
109                 *out = *in;
110
111                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
112                        seq->lss_name, PRANGE(space));
113         } else {
114                 if (range_space(space) < seq->lss_width) {
115                         CWARN("%s: Sequences space to be exhausted soon. "
116                               "Only "LPU64" sequences left\n", seq->lss_name,
117                               range_space(space));
118                         *out = *space;
119                         space->lr_start = space->lr_end;
120                 } else if (range_is_exhausted(space)) {
121                         CERROR("%s: Sequences space is exhausted\n",
122                                seq->lss_name);
123                         RETURN(-ENOSPC);
124                 } else {
125                         range_alloc(out, space, seq->lss_width);
126                 }
127         }
128
129         rc = seq_store_write(seq, env);
130         if (rc) {
131                 CERROR("%s: Can't write space data, rc %d\n",
132                        seq->lss_name, rc);
133                 RETURN(rc);
134         }
135
136         CDEBUG(D_INFO, "%s: Allocated super-sequence "
137                DRANGE"\n", seq->lss_name, PRANGE(out));
138
139         RETURN(rc);
140 }
141
142 int seq_server_alloc_super(struct lu_server_seq *seq,
143                            struct lu_range *in,
144                            struct lu_range *out,
145                            const struct lu_env *env)
146 {
147         int rc;
148         ENTRY;
149
150         down(&seq->lss_sem);
151         rc = __seq_server_alloc_super(seq, in, out, env);
152         up(&seq->lss_sem);
153
154         RETURN(rc);
155 }
156
157 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
158                                    struct lu_range *in,
159                                    struct lu_range *out,
160                                    const struct lu_env *env)
161 {
162         struct lu_range *space = &seq->lss_space;
163         int rc = 0;
164         ENTRY;
165
166         LASSERT(range_is_sane(space));
167
168         /*
169          * This is recovery case. Adjust super range if input range looks like
170          * it is allocated from new super.
171          */
172         if (in != NULL) {
173                 CDEBUG(D_INFO, "%s: Input seq range: "
174                        DRANGE"\n", seq->lss_name, PRANGE(in));
175
176                 if (range_is_exhausted(space)) {
177                         /*
178                          * Server cannot send empty range to client, this is why
179                          * we check here that range from client is "newer" than
180                          * exhausted super.
181                          */
182                         LASSERT(in->lr_end > space->lr_start);
183
184                         /*
185                          * Start is set to end of last allocated, because it
186                          * *is* already allocated so we take that into account
187                          * and do not use for other allocations.
188                          */
189                         space->lr_start = in->lr_end;
190
191                         /*
192                          * End is set to in->lr_start + super sequence
193                          * allocation unit. That is because in->lr_start is
194                          * first seq in new allocated range from controller
195                          * before failure.
196                          */
197                         space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
198
199                         if (!seq->lss_cli) {
200                                 CERROR("%s: No sequence controller "
201                                        "is attached.\n", seq->lss_name);
202                                 RETURN(-ENODEV);
203                         }
204
205                         /*
206                          * Let controller know that this is recovery and last
207                          * obtained range from it was @space.
208                          */
209                         rc = seq_client_replay_super(seq->lss_cli, space, env);
210                         if (rc) {
211                                 CERROR("%s: Can't replay super-sequence, "
212                                        "rc %d\n", seq->lss_name, rc);
213                                 RETURN(rc);
214                         }
215                 } else {
216                         /*
217                          * Update super start by end from client's range. Super
218                          * end should not be changed if range was not exhausted.
219                          */
220                         if (in->lr_end > space->lr_start)
221                                 space->lr_start = in->lr_end;
222                 }
223
224                 *out = *in;
225
226                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
227                        seq->lss_name, PRANGE(space));
228         } else {
229                 /*
230                  * XXX: Avoid cascading RPCs using kind of async preallocation
231                  * when meta-sequence is close to exhausting.
232                  */
233                 if (range_is_exhausted(space)) {
234                         if (!seq->lss_cli) {
235                                 CERROR("%s: No sequence controller "
236                                        "is attached.\n", seq->lss_name);
237                                 RETURN(-ENODEV);
238                         }
239
240                         rc = seq_client_alloc_super(seq->lss_cli, env);
241                         if (rc) {
242                                 CERROR("%s: Can't allocate super-sequence, "
243                                        "rc %d\n", seq->lss_name, rc);
244                                 RETURN(rc);
245                         }
246
247                         /* Saving new range to allocation space. */
248                         *space = seq->lss_cli->lcs_space;
249                         LASSERT(range_is_sane(space));
250                 }
251
252                 range_alloc(out, space, seq->lss_width);
253         }
254
255         rc = seq_store_write(seq, env);
256         if (rc) {
257                 CERROR("%s: Can't write space data, rc %d\n",
258                        seq->lss_name, rc);
259         }
260
261         if (rc == 0) {
262                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
263                        DRANGE"\n", seq->lss_name, PRANGE(out));
264         }
265
266         RETURN(rc);
267 }
268
269 int seq_server_alloc_meta(struct lu_server_seq *seq,
270                           struct lu_range *in,
271                           struct lu_range *out,
272                           const struct lu_env *env)
273 {
274         int rc;
275         ENTRY;
276
277         down(&seq->lss_sem);
278         rc = __seq_server_alloc_meta(seq, in, out, env);
279         up(&seq->lss_sem);
280
281         RETURN(rc);
282 }
283
284 static int seq_server_handle(struct lu_site *site,
285                              const struct lu_env *env,
286                              __u32 opc, struct lu_range *in,
287                              struct lu_range *out)
288 {
289         int rc;
290         ENTRY;
291
292         switch (opc) {
293         case SEQ_ALLOC_META:
294                 if (!site->ls_server_seq) {
295                         CERROR("Sequence server is not "
296                                "initialized\n");
297                         RETURN(-EINVAL);
298                 }
299                 rc = seq_server_alloc_meta(site->ls_server_seq,
300                                            in, out, env);
301                 break;
302         case SEQ_ALLOC_SUPER:
303                 if (!site->ls_control_seq) {
304                         CERROR("Sequence controller is not "
305                                "initialized\n");
306                         RETURN(-EINVAL);
307                 }
308                 rc = seq_server_alloc_super(site->ls_control_seq,
309                                             in, out, env);
310                 break;
311         default:
312                 rc = -EINVAL;
313                 break;
314         }
315
316         RETURN(rc);
317 }
318
319 static int seq_req_handle(struct ptlrpc_request *req,
320                           const struct lu_env *env,
321                           struct seq_thread_info *info)
322 {
323         struct lu_range *out, *in = NULL;
324         struct lu_site *site;
325         int rc = -EPROTO;
326         __u32 *opc;
327         ENTRY;
328
329         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
330         LASSERT(site != NULL);
331                         
332         rc = req_capsule_pack(&info->sti_pill);
333         if (rc)
334                 RETURN(err_serious(rc));
335
336         opc = req_capsule_client_get(&info->sti_pill,
337                                      &RMF_SEQ_OPC);
338         if (opc != NULL) {
339                 out = req_capsule_server_get(&info->sti_pill,
340                                              &RMF_SEQ_RANGE);
341                 if (out == NULL)
342                         RETURN(err_serious(-EPROTO));
343
344                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
345                         in = req_capsule_client_get(&info->sti_pill,
346                                                     &RMF_SEQ_RANGE);
347
348                         LASSERT(!range_is_zero(in) && range_is_sane(in));
349                 }
350
351                 rc = seq_server_handle(site, env, *opc, in, out);
352         } else
353                 rc = err_serious(-EPROTO);
354
355         RETURN(rc);
356 }
357
358 static void *seq_key_init(const struct lu_context *ctx,
359                           struct lu_context_key *key)
360 {
361         struct seq_thread_info *info;
362
363         /*
364          * check that no high order allocations are incurred.
365          */
366         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
367         OBD_ALLOC_PTR(info);
368         if (info == NULL)
369                 info = ERR_PTR(-ENOMEM);
370         return info;
371 }
372
373 static void seq_key_fini(const struct lu_context *ctx,
374                          struct lu_context_key *key, void *data)
375 {
376         struct seq_thread_info *info = data;
377         OBD_FREE_PTR(info);
378 }
379
380 struct lu_context_key seq_thread_key = {
381         .lct_tags = LCT_MD_THREAD,
382         .lct_init = seq_key_init,
383         .lct_fini = seq_key_fini
384 };
385
386 static void seq_thread_info_init(struct ptlrpc_request *req,
387                                  struct seq_thread_info *info)
388 {
389         int i;
390
391         /* Mark rep buffer as req-layout stuff expects */
392         for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
393                 info->sti_rep_buf_size[i] = -1;
394
395         /* Init request capsule */
396         req_capsule_init(&info->sti_pill, req, RCL_SERVER,
397                          info->sti_rep_buf_size);
398
399         req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
400 }
401
402 static void seq_thread_info_fini(struct seq_thread_info *info)
403 {
404         req_capsule_fini(&info->sti_pill);
405 }
406
407 static int seq_handle(struct ptlrpc_request *req)
408 {
409         const struct lu_env *env;
410         struct seq_thread_info *info;
411         int rc;
412
413         env = req->rq_svc_thread->t_env;
414         LASSERT(env != NULL);
415
416         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
417         LASSERT(info != NULL);
418
419         seq_thread_info_init(req, info);
420         rc = seq_req_handle(req, env, info);
421         seq_thread_info_fini(info);
422
423         return rc;
424 }
425
426 /*
427  * Entry point for handling FLD RPCs called from MDT.
428  */
429 int seq_query(struct com_thread_info *info)
430 {
431         return seq_handle(info->cti_pill.rc_req);
432 }
433 EXPORT_SYMBOL(seq_query);
434
435 static void seq_server_proc_fini(struct lu_server_seq *seq);
436
437 #ifdef LPROCFS
438 static int seq_server_proc_init(struct lu_server_seq *seq)
439 {
440         int rc;
441         ENTRY;
442
443         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
444                                              seq_type_proc_dir,
445                                              NULL, NULL);
446         if (IS_ERR(seq->lss_proc_dir)) {
447                 rc = PTR_ERR(seq->lss_proc_dir);
448                 RETURN(rc);
449         }
450
451         rc = lprocfs_add_vars(seq->lss_proc_dir,
452                               seq_server_proc_list, seq);
453         if (rc) {
454                 CERROR("%s: Can't init sequence manager "
455                        "proc, rc %d\n", seq->lss_name, rc);
456                 GOTO(out_cleanup, rc);
457         }
458
459         RETURN(0);
460
461 out_cleanup:
462         seq_server_proc_fini(seq);
463         return rc;
464 }
465
466 static void seq_server_proc_fini(struct lu_server_seq *seq)
467 {
468         ENTRY;
469         if (seq->lss_proc_dir != NULL) {
470                 if (!IS_ERR(seq->lss_proc_dir))
471                         lprocfs_remove(seq->lss_proc_dir);
472                 seq->lss_proc_dir = NULL;
473         }
474         EXIT;
475 }
476 #else
477 static int seq_server_proc_init(struct lu_server_seq *seq)
478 {
479         return 0;
480 }
481
482 static void seq_server_proc_fini(struct lu_server_seq *seq)
483 {
484         return;
485 }
486 #endif
487
488 int seq_server_init(struct lu_server_seq *seq,
489                     struct dt_device *dev,
490                     const char *prefix,
491                     enum lu_mgr_type type,
492                     const struct lu_env *env)
493 {
494         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
495         ENTRY;
496
497         LASSERT(dev != NULL);
498         LASSERT(prefix != NULL);
499
500         seq->lss_cli = NULL;
501         seq->lss_type = type;
502         range_zero(&seq->lss_space);
503         sema_init(&seq->lss_sem, 1);
504
505         seq->lss_width = is_srv ?
506                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
507
508         snprintf(seq->lss_name, sizeof(seq->lss_name),
509                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
510
511         rc = seq_store_init(seq, env, dev);
512         if (rc)
513                 GOTO(out, rc);
514
515         /* Request backing store for saved sequence info. */
516         rc = seq_store_read(seq, env);
517         if (rc == -ENODATA) {
518
519                 /* Nothing is read, init by default value. */
520                 seq->lss_space = is_srv ?
521                         LUSTRE_SEQ_ZERO_RANGE:
522                         LUSTRE_SEQ_SPACE_RANGE;
523
524                 CDEBUG(D_INFO, "%s: No data found "
525                        "on store. Initialize space\n",
526                        seq->lss_name);
527
528                 /* Save default controller value to store. */
529                 rc = seq_store_write(seq, env);
530                 if (rc) {
531                         CERROR("%s: Can't write space data, "
532                                "rc %d\n", seq->lss_name, rc);
533                 }
534         } else if (rc) {
535                 CERROR("%s: Can't read space data, rc %d\n",
536                        seq->lss_name, rc);
537                 GOTO(out, rc);
538         }
539
540         if (is_srv) {
541                 LASSERT(range_is_sane(&seq->lss_space));
542         } else {
543                 LASSERT(!range_is_zero(&seq->lss_space) &&
544                         range_is_sane(&seq->lss_space));
545         }
546
547         rc  = seq_server_proc_init(seq);
548         if (rc)
549                 GOTO(out, rc);
550
551         EXIT;
552 out:
553         if (rc)
554                 seq_server_fini(seq, env);
555         return rc;
556 }
557 EXPORT_SYMBOL(seq_server_init);
558
559 void seq_server_fini(struct lu_server_seq *seq,
560                      const struct lu_env *env)
561 {
562         ENTRY;
563
564         seq_server_proc_fini(seq);
565         seq_store_fini(seq, env);
566
567         EXIT;
568 }
569 EXPORT_SYMBOL(seq_server_fini);
570
571 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
572
573 static int __init fid_mod_init(void)
574 {
575         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
576                                              proc_lustre_root,
577                                              NULL, NULL);
578         if (IS_ERR(seq_type_proc_dir))
579                 return PTR_ERR(seq_type_proc_dir);
580
581         lu_context_key_register(&seq_thread_key);
582         return 0;
583 }
584
585 static void __exit fid_mod_exit(void)
586 {
587         lu_context_key_degister(&seq_thread_key);
588         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
589                 lprocfs_remove(seq_type_proc_dir);
590                 seq_type_proc_dir = NULL;
591         }
592 }
593
594 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
595 MODULE_DESCRIPTION("Lustre FID Module");
596 MODULE_LICENSE("GPL");
597
598 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
599 #endif