Whamcloud - gitweb
patches related to bug 13377 (CMD small fixes), 2+4 patch and fid_unpack patch
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* Assigns client to sequence controller node. */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53                        struct lu_client_seq *cli,
54                        const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         /*
60          * Ask client for new range, assign that range to ->seq_space and write
61          * seq state to backing store should be atomic.
62          */
63         down(&seq->lss_sem);
64
65         if (cli == NULL) {
66                 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
67                        seq->lss_name, cli->lcs_name);
68                 seq->lss_cli = cli;
69                 GOTO(out_up, rc = 0);
70         }
71
72         if (seq->lss_cli != NULL) {
73                 CERROR("%s: Sequence controller is already "
74                        "assigned\n", seq->lss_name);
75                 GOTO(out_up, rc = -EINVAL);
76         }
77
78         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
79                seq->lss_name, cli->lcs_name);
80
81         seq->lss_cli = cli;
82         EXIT;
83 out_up:
84         up(&seq->lss_sem);
85         return rc;
86 }
87 EXPORT_SYMBOL(seq_server_set_cli);
88
89 /*
90  * On controller node, allocate new super sequence for regular sequence server.
91  */
92 static int __seq_server_alloc_super(struct lu_server_seq *seq,
93                                     struct lu_range *in,
94                                     struct lu_range *out,
95                                     const struct lu_env *env)
96 {
97         struct lu_range *space = &seq->lss_space;
98         int rc;
99         ENTRY;
100
101         LASSERT(range_is_sane(space));
102
103         if (in != NULL) {
104                 CDEBUG(D_INFO, "%s: Input seq range: "
105                        DRANGE"\n", seq->lss_name, PRANGE(in));
106
107                 if (in->lr_end > space->lr_start)
108                         space->lr_start = in->lr_end;
109                 *out = *in;
110
111                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
112                        seq->lss_name, PRANGE(space));
113         } else {
114                 if (range_space(space) < seq->lss_width) {
115                         CWARN("%s: Sequences space to be exhausted soon. "
116                               "Only "LPU64" sequences left\n", seq->lss_name,
117                               range_space(space));
118                         *out = *space;
119                         space->lr_start = space->lr_end;
120                 } else if (range_is_exhausted(space)) {
121                         CERROR("%s: Sequences space is exhausted\n",
122                                seq->lss_name);
123                         RETURN(-ENOSPC);
124                 } else {
125                         range_alloc(out, space, seq->lss_width);
126                 }
127         }
128
129         rc = seq_store_write(seq, env);
130         if (rc) {
131                 CERROR("%s: Can't write space data, rc %d\n",
132                        seq->lss_name, rc);
133                 RETURN(rc);
134         }
135
136         CDEBUG(D_INFO, "%s: Allocated super-sequence "
137                DRANGE"\n", seq->lss_name, PRANGE(out));
138
139         RETURN(rc);
140 }
141
142 int seq_server_alloc_super(struct lu_server_seq *seq,
143                            struct lu_range *in,
144                            struct lu_range *out,
145                            const struct lu_env *env)
146 {
147         int rc;
148         ENTRY;
149
150         down(&seq->lss_sem);
151         rc = __seq_server_alloc_super(seq, in, out, env);
152         up(&seq->lss_sem);
153
154         RETURN(rc);
155 }
156
157 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
158                                    struct lu_range *in,
159                                    struct lu_range *out,
160                                    const struct lu_env *env)
161 {
162         struct lu_range *space = &seq->lss_space;
163         int rc = 0;
164         ENTRY;
165
166         LASSERT(range_is_sane(space));
167
168         /*
169          * This is recovery case. Adjust super range if input range looks like
170          * it is allocated from new super.
171          */
172         if (in != NULL) {
173                 CDEBUG(D_INFO, "%s: Input seq range: "
174                        DRANGE"\n", seq->lss_name, PRANGE(in));
175
176                 if (range_is_exhausted(space)) {
177                         /*
178                          * Server cannot send empty range to client, this is why
179                          * we check here that range from client is "newer" than
180                          * exhausted super.
181                          */
182                         LASSERT(in->lr_end > space->lr_start);
183
184                         /*
185                          * Start is set to end of last allocated, because it
186                          * *is* already allocated so we take that into account
187                          * and do not use for other allocations.
188                          */
189                         space->lr_start = in->lr_end;
190
191                         /*
192                          * End is set to in->lr_start + super sequence
193                          * allocation unit. That is because in->lr_start is
194                          * first seq in new allocated range from controller
195                          * before failure.
196                          */
197                         space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
198
199                         if (!seq->lss_cli) {
200                                 CERROR("%s: No sequence controller "
201                                        "is attached.\n", seq->lss_name);
202                                 RETURN(-ENODEV);
203                         }
204
205                         /*
206                          * Let controller know that this is recovery and last
207                          * obtained range from it was @space.
208                          */
209                         rc = seq_client_replay_super(seq->lss_cli, space, env);
210                         if (rc) {
211                                 CERROR("%s: Can't replay super-sequence, "
212                                        "rc %d\n", seq->lss_name, rc);
213                                 RETURN(rc);
214                         }
215                 } else {
216                         /*
217                          * Update super start by end from client's range. Super
218                          * end should not be changed if range was not exhausted.
219                          */
220                         if (in->lr_end > space->lr_start)
221                                 space->lr_start = in->lr_end;
222                 }
223
224                 *out = *in;
225
226                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
227                        seq->lss_name, PRANGE(space));
228         } else {
229                 /*
230                  * XXX: Avoid cascading RPCs using kind of async preallocation
231                  * when meta-sequence is close to exhausting.
232                  */
233                 if (range_is_exhausted(space)) {
234                         if (!seq->lss_cli) {
235                                 CERROR("%s: No sequence controller "
236                                        "is attached.\n", seq->lss_name);
237                                 RETURN(-ENODEV);
238                         }
239
240                         rc = seq_client_alloc_super(seq->lss_cli, env);
241                         if (rc) {
242                                 CERROR("%s: Can't allocate super-sequence, "
243                                        "rc %d\n", seq->lss_name, rc);
244                                 RETURN(rc);
245                         }
246
247                         /* Saving new range to allocation space. */
248                         *space = seq->lss_cli->lcs_space;
249                         LASSERT(range_is_sane(space));
250                 }
251
252                 range_alloc(out, space, seq->lss_width);
253         }
254
255         rc = seq_store_write(seq, env);
256         if (rc) {
257                 CERROR("%s: Can't write space data, rc %d\n",
258                        seq->lss_name, rc);
259         }
260
261         if (rc == 0) {
262                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
263                        DRANGE"\n", seq->lss_name, PRANGE(out));
264         }
265
266         RETURN(rc);
267 }
268
269 int seq_server_alloc_meta(struct lu_server_seq *seq,
270                           struct lu_range *in,
271                           struct lu_range *out,
272                           const struct lu_env *env)
273 {
274         int rc;
275         ENTRY;
276
277         down(&seq->lss_sem);
278         rc = __seq_server_alloc_meta(seq, in, out, env);
279         up(&seq->lss_sem);
280
281         RETURN(rc);
282 }
283 EXPORT_SYMBOL(seq_server_alloc_meta);
284
285 static int seq_server_handle(struct lu_site *site,
286                              const struct lu_env *env,
287                              __u32 opc, struct lu_range *in,
288                              struct lu_range *out)
289 {
290         int rc;
291         ENTRY;
292
293         switch (opc) {
294         case SEQ_ALLOC_META:
295                 if (!site->ls_server_seq) {
296                         CERROR("Sequence server is not "
297                                "initialized\n");
298                         RETURN(-EINVAL);
299                 }
300                 rc = seq_server_alloc_meta(site->ls_server_seq,
301                                            in, out, env);
302                 break;
303         case SEQ_ALLOC_SUPER:
304                 if (!site->ls_control_seq) {
305                         CERROR("Sequence controller is not "
306                                "initialized\n");
307                         RETURN(-EINVAL);
308                 }
309                 rc = seq_server_alloc_super(site->ls_control_seq,
310                                             in, out, env);
311                 break;
312         default:
313                 rc = -EINVAL;
314                 break;
315         }
316
317         RETURN(rc);
318 }
319
320 static int seq_req_handle(struct ptlrpc_request *req,
321                           const struct lu_env *env,
322                           struct seq_thread_info *info)
323 {
324         struct lu_range *out, *in = NULL;
325         struct lu_site *site;
326         int rc = -EPROTO;
327         __u32 *opc;
328         ENTRY;
329
330         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
331         LASSERT(site != NULL);
332                         
333         rc = req_capsule_pack(&info->sti_pill);
334         if (rc)
335                 RETURN(err_serious(rc));
336
337         opc = req_capsule_client_get(&info->sti_pill,
338                                      &RMF_SEQ_OPC);
339         if (opc != NULL) {
340                 out = req_capsule_server_get(&info->sti_pill,
341                                              &RMF_SEQ_RANGE);
342                 if (out == NULL)
343                         RETURN(err_serious(-EPROTO));
344
345                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
346                         in = req_capsule_client_get(&info->sti_pill,
347                                                     &RMF_SEQ_RANGE);
348
349                         LASSERT(!range_is_zero(in) && range_is_sane(in));
350                 }
351
352                 rc = seq_server_handle(site, env, *opc, in, out);
353         } else
354                 rc = err_serious(-EPROTO);
355
356         RETURN(rc);
357 }
358
359 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
360
361 struct lu_context_key seq_thread_key = {
362         .lct_tags = LCT_MD_THREAD,
363         .lct_init = seq_key_init,
364         .lct_fini = seq_key_fini
365 };
366
367 static void seq_thread_info_init(struct ptlrpc_request *req,
368                                  struct seq_thread_info *info)
369 {
370         int i;
371
372         /* Mark rep buffer as req-layout stuff expects */
373         for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
374                 info->sti_rep_buf_size[i] = -1;
375
376         /* Init request capsule */
377         req_capsule_init(&info->sti_pill, req, RCL_SERVER,
378                          info->sti_rep_buf_size);
379
380         req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
381 }
382
383 static void seq_thread_info_fini(struct seq_thread_info *info)
384 {
385         req_capsule_fini(&info->sti_pill);
386 }
387
388 static int seq_handle(struct ptlrpc_request *req)
389 {
390         const struct lu_env *env;
391         struct seq_thread_info *info;
392         int rc;
393
394         env = req->rq_svc_thread->t_env;
395         LASSERT(env != NULL);
396
397         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
398         LASSERT(info != NULL);
399
400         seq_thread_info_init(req, info);
401         rc = seq_req_handle(req, env, info);
402         seq_thread_info_fini(info);
403
404         return rc;
405 }
406
407 /*
408  * Entry point for handling FLD RPCs called from MDT.
409  */
410 int seq_query(struct com_thread_info *info)
411 {
412         return seq_handle(info->cti_pill.rc_req);
413 }
414 EXPORT_SYMBOL(seq_query);
415
416 static void seq_server_proc_fini(struct lu_server_seq *seq);
417
418 #ifdef LPROCFS
419 static int seq_server_proc_init(struct lu_server_seq *seq)
420 {
421         int rc;
422         ENTRY;
423
424         seq->lss_proc_dir = lprocfs_register(seq->lss_name,
425                                              seq_type_proc_dir,
426                                              NULL, NULL);
427         if (IS_ERR(seq->lss_proc_dir)) {
428                 rc = PTR_ERR(seq->lss_proc_dir);
429                 RETURN(rc);
430         }
431
432         rc = lprocfs_add_vars(seq->lss_proc_dir,
433                               seq_server_proc_list, seq);
434         if (rc) {
435                 CERROR("%s: Can't init sequence manager "
436                        "proc, rc %d\n", seq->lss_name, rc);
437                 GOTO(out_cleanup, rc);
438         }
439
440         RETURN(0);
441
442 out_cleanup:
443         seq_server_proc_fini(seq);
444         return rc;
445 }
446
447 static void seq_server_proc_fini(struct lu_server_seq *seq)
448 {
449         ENTRY;
450         if (seq->lss_proc_dir != NULL) {
451                 if (!IS_ERR(seq->lss_proc_dir))
452                         lprocfs_remove(&seq->lss_proc_dir);
453                 seq->lss_proc_dir = NULL;
454         }
455         EXIT;
456 }
457 #else
458 static int seq_server_proc_init(struct lu_server_seq *seq)
459 {
460         return 0;
461 }
462
463 static void seq_server_proc_fini(struct lu_server_seq *seq)
464 {
465         return;
466 }
467 #endif
468
469 int seq_server_init(struct lu_server_seq *seq,
470                     struct dt_device *dev,
471                     const char *prefix,
472                     enum lu_mgr_type type,
473                     const struct lu_env *env)
474 {
475         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
476         ENTRY;
477
478         LASSERT(dev != NULL);
479         LASSERT(prefix != NULL);
480
481         seq->lss_cli = NULL;
482         seq->lss_type = type;
483         range_zero(&seq->lss_space);
484         sema_init(&seq->lss_sem, 1);
485
486         seq->lss_width = is_srv ?
487                 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
488
489         snprintf(seq->lss_name, sizeof(seq->lss_name),
490                  "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
491
492         rc = seq_store_init(seq, env, dev);
493         if (rc)
494                 GOTO(out, rc);
495
496         /* Request backing store for saved sequence info. */
497         rc = seq_store_read(seq, env);
498         if (rc == -ENODATA) {
499
500                 /* Nothing is read, init by default value. */
501                 seq->lss_space = is_srv ?
502                         LUSTRE_SEQ_ZERO_RANGE:
503                         LUSTRE_SEQ_SPACE_RANGE;
504
505                 CDEBUG(D_INFO, "%s: No data found "
506                        "on store. Initialize space\n",
507                        seq->lss_name);
508
509                 /* Save default controller value to store. */
510                 rc = seq_store_write(seq, env);
511                 if (rc) {
512                         CERROR("%s: Can't write space data, "
513                                "rc %d\n", seq->lss_name, rc);
514                 }
515         } else if (rc) {
516                 CERROR("%s: Can't read space data, rc %d\n",
517                        seq->lss_name, rc);
518                 GOTO(out, rc);
519         }
520
521         if (is_srv) {
522                 LASSERT(range_is_sane(&seq->lss_space));
523         } else {
524                 LASSERT(!range_is_zero(&seq->lss_space) &&
525                         range_is_sane(&seq->lss_space));
526         }
527
528         rc  = seq_server_proc_init(seq);
529         if (rc)
530                 GOTO(out, rc);
531
532         EXIT;
533 out:
534         if (rc)
535                 seq_server_fini(seq, env);
536         return rc;
537 }
538 EXPORT_SYMBOL(seq_server_init);
539
540 void seq_server_fini(struct lu_server_seq *seq,
541                      const struct lu_env *env)
542 {
543         ENTRY;
544
545         seq_server_proc_fini(seq);
546         seq_store_fini(seq, env);
547
548         EXIT;
549 }
550 EXPORT_SYMBOL(seq_server_fini);
551
552 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
553
554 static int __init fid_mod_init(void)
555 {
556         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
557                                              proc_lustre_root,
558                                              NULL, NULL);
559         if (IS_ERR(seq_type_proc_dir))
560                 return PTR_ERR(seq_type_proc_dir);
561
562         LU_CONTEXT_KEY_INIT(&seq_thread_key);
563         lu_context_key_register(&seq_thread_key);
564         return 0;
565 }
566
567 static void __exit fid_mod_exit(void)
568 {
569         lu_context_key_degister(&seq_thread_key);
570         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
571                 lprocfs_remove(&seq_type_proc_dir);
572                 seq_type_proc_dir = NULL;
573         }
574 }
575
576 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
577 MODULE_DESCRIPTION("Lustre FID Module");
578 MODULE_LICENSE("GPL");
579
580 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);
581 #endif