Whamcloud - gitweb
- fixed possible deadlock in server side seq-mgr, however seq-mgr still has few issue...
[fs/lustre-release.git] / lustre / fid / fid_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_handler.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
49
50 #ifdef __KERNEL__
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52  * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
54         (0x400),
55         ((__u64)~0ULL)
56 };
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
58
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
61         0,
62         0
63 };
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
65
66 static int
67 seq_server_write_state(struct lu_server_seq *seq,
68                        const struct lu_context *ctx)
69 {
70         int rc = 0;
71         ENTRY;
72
73         /* XXX: here should be calling struct dt_device methods to write
74          * sequence state to backing store. */
75
76         RETURN(rc);
77 }
78
79 static int
80 seq_server_read_state(struct lu_server_seq *seq,
81                       const struct lu_context *ctx)
82 {
83         int rc = -ENODATA;
84         ENTRY;
85         
86         /* XXX: here should be calling struct dt_device methods to read the
87          * sequence state from backing store. */
88
89         RETURN(rc);
90 }
91
92 /* on controller node, allocate new super sequence for regular sequnece
93  * server. */
94 static int
95 __seq_server_alloc_super(struct lu_server_seq *seq,
96                          struct lu_range *range,
97                          const struct lu_context *ctx)
98 {
99         struct lu_range *space = &seq->seq_space;
100         int rc;
101         ENTRY;
102
103         LASSERT(range_is_sane(space));
104         
105         if (range_space(space) < LUSTRE_SEQ_SUPER_WIDTH) {
106                 CWARN("sequences space is going to exhauste soon. "
107                       "Only can allocate "LPU64" sequences\n",
108                       space->lr_end - space->lr_start);
109                 *range = *space;
110                 space->lr_start = space->lr_end;
111                 rc = 0;
112         } else if (range_is_exhausted(space)) {
113                 CERROR("sequences space is exhausted\n");
114                 rc = -ENOSPC;
115         } else {
116                 range_alloc(range, space, LUSTRE_SEQ_SUPER_WIDTH);
117                 rc = 0;
118         }
119
120         rc = seq_server_write_state(seq, ctx);
121         if (rc) {
122                 CERROR("can't save state, rc = %d\n",
123                        rc);
124         }
125
126         if (rc == 0) {
127                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated super-sequence "
128                        "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
129         }
130         
131         RETURN(rc);
132 }
133
134 static int
135 seq_server_alloc_super(struct lu_server_seq *seq,
136                        struct lu_range *range,
137                        const struct lu_context *ctx)
138 {
139         int rc;
140         ENTRY;
141
142         down(&seq->seq_sem);
143         rc = __seq_server_alloc_super(seq, range, ctx);
144         up(&seq->seq_sem);
145         
146         RETURN(rc);
147 }
148
149 static int
150 __seq_server_alloc_meta(struct lu_server_seq *seq,
151                         struct lu_range *range,
152                         const struct lu_context *ctx)
153 {
154         struct lu_range *super = &seq->seq_super;
155         int rc = 0;
156         ENTRY;
157
158         LASSERT(range_is_sane(super));
159
160         /* XXX: here we should avoid cascading RPCs using kind of async
161          * preallocation when meta-sequence is close to exhausting. */
162         if (range_is_exhausted(super)) {
163                 if (!seq->seq_cli) {
164                         CERROR("no seq-controller client is setup\n");
165                         RETURN(-EOPNOTSUPP);
166                 }
167
168                 /* allocate new super-sequence. */
169                 up(&seq->seq_sem);
170                 rc = seq_client_alloc_super(seq->seq_cli);
171                 down(&seq->seq_sem);
172                 if (rc) {
173                         CERROR("can't allocate new super-sequence, "
174                                "rc %d\n", rc);
175                         RETURN(rc);
176                 }
177
178                 /* saving new range into allocation space. */
179                 *super = seq->seq_cli->seq_range;
180                 LASSERT(range_is_sane(super));
181         }
182         range_alloc(range, super, LUSTRE_SEQ_META_WIDTH);
183
184         rc = seq_server_write_state(seq, ctx);
185         if (rc) {
186                 CERROR("can't save state, rc = %d\n",
187                        rc);
188         }
189
190         if (rc == 0) {
191                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated meta-sequence "
192                        "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
193         }
194
195         RETURN(rc);
196 }
197
198 static int
199 seq_server_alloc_meta(struct lu_server_seq *seq,
200                       struct lu_range *range,
201                       const struct lu_context *ctx)
202 {
203         int rc;
204         ENTRY;
205
206         down(&seq->seq_sem);
207         rc = __seq_server_alloc_meta(seq, range, ctx);
208         up(&seq->seq_sem);
209         
210         RETURN(rc);
211 }
212
213 static int
214 seq_server_handle(struct lu_server_seq *seq,
215                   const struct lu_context *ctx, 
216                   struct lu_range *range,
217                   __u32 opc)
218 {
219         int rc;
220         ENTRY;
221
222         switch (opc) {
223         case SEQ_ALLOC_SUPER:
224                 rc = seq_server_alloc_super(seq, range, ctx);
225                 break;
226         case SEQ_ALLOC_META:
227                 rc = seq_server_alloc_meta(seq, range, ctx);
228                 break;
229         default:
230                 rc = -EINVAL;
231                 break;
232         }
233
234         RETURN(rc);
235 }
236
237 static int
238 seq_req_handle0(const struct lu_context *ctx,
239                 struct lu_server_seq *seq, 
240                 struct ptlrpc_request *req) 
241 {
242         int rep_buf_size[2] = { 0, };
243         struct req_capsule pill;
244         struct lu_range *out;
245         int rc = -EPROTO;
246         __u32 *opc;
247         ENTRY;
248
249         req_capsule_init(&pill, req, RCL_SERVER,
250                          rep_buf_size);
251
252         req_capsule_set(&pill, &RQF_SEQ_QUERY);
253         req_capsule_pack(&pill);
254
255         opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
256         if (opc != NULL) {
257                 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
258                 if (out == NULL) {
259                         CERROR("can't get range buffer\n");
260                         GOTO(out_pill, rc= -EPROTO);
261                 }
262                 rc = seq_server_handle(seq, ctx, out, *opc);
263         } else {
264                 CERROR("cannot unpack client request\n");
265         }
266
267 out_pill:
268         EXIT;
269         req_capsule_fini(&pill);
270         return rc;
271 }
272
273 static int 
274 seq_req_handle(struct ptlrpc_request *req) 
275 {
276         int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
277         const struct lu_context *ctx;
278         struct lu_site    *site;
279         int rc = -EPROTO;
280         ENTRY;
281
282         OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
283         
284         ctx = req->rq_svc_thread->t_ctx;
285         LASSERT(ctx != NULL);
286         LASSERT(ctx->lc_thread == req->rq_svc_thread);
287         if (req->rq_reqmsg->opc == SEQ_QUERY) {
288                 if (req->rq_export != NULL) {
289                         struct obd_device *obd;
290
291                         obd = req->rq_export->exp_obd;
292                         site = obd->obd_lu_dev->ld_site;
293                         LASSERT(site != NULL);
294                         
295                         rc = seq_req_handle0(ctx, site->ls_server_seq, req);
296                 } else {
297                         CERROR("Unconnected request\n");
298                         req->rq_status = -ENOTCONN;
299                         GOTO(out, rc = -ENOTCONN);
300                 }
301         } else {
302                 CERROR("Wrong opcode: %d\n",
303                        req->rq_reqmsg->opc);
304                 req->rq_status = -ENOTSUPP;
305                 rc = ptlrpc_error(req);
306                 RETURN(rc);
307         }
308
309         EXIT;
310 out:
311         target_send_reply(req, rc, fail);
312         return 0;
313
314
315 /* assigns client to sequence controller node */
316 int
317 seq_server_controller(struct lu_server_seq *seq,
318                       struct lu_client_seq *cli,
319                       const struct lu_context *ctx)
320 {
321         int rc = 0;
322         ENTRY;
323         
324         LASSERT(cli != NULL);
325
326         if (seq->seq_cli) {
327                 CERROR("SEQ-MGR(srv): sequence-controller "
328                        "is already assigned\n");
329                 RETURN(-EINVAL);
330         }
331
332         CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): assign "
333                "sequence controller client %s\n",
334                cli->seq_exp->exp_client_uuid.uuid);
335
336         down(&seq->seq_sem);
337         
338         /* assign controller */
339         seq->seq_cli = cli;
340
341         /* get new range from controller only if super-sequence is not yet
342          * initialized from backing store or something else. */
343         if (range_is_zero(&seq->seq_super)) {
344                 /* release sema to avoid deadlock for case we're asking our
345                  * selves. */
346                 up(&seq->seq_sem);
347                 rc = seq_client_alloc_super(cli);
348                 down(&seq->seq_sem);
349                 
350                 if (rc) {
351                         CERROR("can't allocate super-sequence, "
352                                "rc %d\n", rc);
353                         RETURN(rc);
354                 }
355
356                 /* take super-seq from client seq mgr */
357                 LASSERT(range_is_sane(&cli->seq_range));
358
359                 seq->seq_super = cli->seq_range;
360
361                 /* save init seq to backing store. */
362                 rc = seq_server_write_state(seq, ctx);
363                 if (rc) {
364                         CERROR("can't write sequence state, "
365                                "rc = %d\n", rc);
366                 }
367         }
368         up(&seq->seq_sem);
369         RETURN(rc);
370 }
371 EXPORT_SYMBOL(seq_server_controller);
372
373 int
374 seq_server_init(struct lu_server_seq *seq,
375                 struct dt_device *dev, int flags,
376                 const struct lu_context *ctx) 
377 {
378         int rc; 
379         struct ptlrpc_service_conf seq_conf = { 
380                 .psc_nbufs = MDS_NBUFS, 
381                 .psc_bufsize = MDS_BUFSIZE, 
382                 .psc_max_req_size = MDS_MAXREQSIZE,
383                 .psc_max_reply_size = MDS_MAXREPSIZE,
384                 .psc_req_portal = MDS_SEQ_PORTAL,
385                 .psc_rep_portal = MDC_REPLY_PORTAL,
386                 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT, 
387                 .psc_num_threads = SEQ_NUM_THREADS
388         };
389         ENTRY;
390
391         LASSERT(dev != NULL);
392
393         seq->seq_dev = dev;
394         seq->seq_cli = NULL;
395         seq->seq_flags = flags;
396         sema_init(&seq->seq_sem, 1);
397
398         seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
399         seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
400         
401         lu_device_get(&seq->seq_dev->dd_lu_dev);
402
403         /* request backing store for saved sequence info */
404         rc = seq_server_read_state(seq, ctx);
405         if (rc == -ENODATA) {
406                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): no data on "
407                        "disk found, waiting for controller assign\n");
408         } else if (rc) {
409                 CERROR("can't read sequence state, rc = %d\n",
410                        rc);
411                 GOTO(out, rc);
412         }
413         
414         seq->seq_service =  ptlrpc_init_svc_conf(&seq_conf,
415                                                  seq_req_handle,
416                                                  LUSTRE_SEQ0_NAME,
417                                                  seq->seq_proc_entry, 
418                                                  NULL); 
419         if (seq->seq_service != NULL)
420                 rc = ptlrpc_start_threads(NULL, seq->seq_service,
421                                           LUSTRE_SEQ0_NAME); 
422         else 
423                 rc = -ENOMEM; 
424
425         EXIT;
426
427 out:
428         if (rc)
429                 seq_server_fini(seq, ctx);
430         else
431                 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
432                        "Manager\n");
433         return rc;
434
435 EXPORT_SYMBOL(seq_server_init);
436
437 void
438 seq_server_fini(struct lu_server_seq *seq,
439                 const struct lu_context *ctx) 
440 {
441         ENTRY;
442
443         if (seq->seq_service != NULL) {
444                 ptlrpc_unregister_service(seq->seq_service);
445                 seq->seq_service = NULL;
446         }
447
448         if (seq->seq_dev != NULL) {
449                 lu_device_put(&seq->seq_dev->dd_lu_dev);
450                 seq->seq_dev = NULL;
451         }
452         
453         CDEBUG(D_INFO|D_WARNING, "Server Sequence "
454                "Manager\n");
455         EXIT;
456 }
457 EXPORT_SYMBOL(seq_server_fini);
458
459 static int fid_init(void)
460 {
461         ENTRY;
462         RETURN(0);
463 }
464
465 static int fid_fini(void)
466 {
467         ENTRY;
468         RETURN(0);
469 }
470
471 static int 
472 __init fid_mod_init(void) 
473
474 {
475         /* init caches if any */
476         fid_init();
477         return 0;
478 }
479
480 static void 
481 __exit fid_mod_exit(void) 
482 {
483         /* free caches if any */
484         fid_fini();
485         return;
486 }
487
488 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
489 MODULE_DESCRIPTION("Lustre FID Module");
490 MODULE_LICENSE("GPL");
491
492 cfs_module(fid, "0.0.4", fid_mod_init, fid_mod_exit);
493 #endif