Whamcloud - gitweb
- fixes after FLD CR:
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: WangDi <wangdi@clusterfs.com>
9  *           Yury Umanets <umka@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FLD
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 # include <linux/jbd.h>
38 # include <asm/div64.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 # include <libcfs/list.h>
42 #endif
43
44 #include <obd.h>
45 #include <obd_class.h>
46 #include <lustre_ver.h>
47 #include <obd_support.h>
48 #include <lprocfs_status.h>
49
50 #include <dt_object.h>
51 #include <md_object.h>
52 #include <lustre_req_layout.h>
53 #include <lustre_fld.h>
54 #include "fld_internal.h"
55
56 #ifdef __KERNEL__
57 struct fld_cache_info *fld_cache = NULL;
58
59 static int fld_init(void)
60 {
61         int rc = 0;
62         ENTRY;
63         
64         fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
65         if (IS_ERR(fld_cache))
66                 rc = PTR_ERR(fld_cache);
67
68         if (rc != 0)
69                 fld_cache = NULL;
70         
71         RETURN(rc);
72 }
73
74 static void fld_fini(void)
75 {
76         ENTRY;
77         if (fld_cache != NULL) {
78                 fld_cache_fini(fld_cache);
79                 fld_cache = NULL;
80         }
81         EXIT;
82 }
83
84 static int __init fld_mod_init(void)
85 {
86         fld_init();
87         return 0;
88 }
89
90 static void __exit fld_mod_exit(void)
91 {
92         fld_fini();
93         return;
94 }
95
96 /* insert index entry and update cache */
97 static int
98 fld_server_create(struct lu_server_fld *fld,
99                   const struct lu_context *ctx,
100                   __u64 seq, mdsno_t mds)
101 {
102         int rc;
103         ENTRY;
104         
105         rc = fld_index_create(fld, ctx, seq, mds);
106         if (rc == 0) {
107                 /* do not return result of calling fld_cache_insert()
108                  * here. First of all because it may return -EEXISTS. Another
109                  * reason is that, we do not want to stop proceeding because of
110                  * cache errors. --umka */
111                 fld_cache_insert(fld_cache, seq, mds);
112         }
113         RETURN(rc);
114 }
115
116 /* delete index entry and update cache */
117 static int
118 fld_server_delete(struct lu_server_fld *fld,
119                   const struct lu_context *ctx,
120                   __u64 seq)
121 {
122         ENTRY;
123         fld_cache_delete(fld_cache, seq);
124         RETURN(fld_index_delete(fld, ctx, seq));
125 }
126
127 /* lookup in cache first and then issue index lookup */
128 static int
129 fld_server_lookup(struct lu_server_fld *fld,
130                   const struct lu_context *ctx,
131                   __u64 seq, mdsno_t *mds)
132 {
133         struct fld_cache_entry *flde;
134         int rc;
135         ENTRY;
136         
137         /* lookup it in the cache first */
138         flde = fld_cache_lookup(fld_cache, seq);
139         if (flde != NULL) {
140                 *mds = flde->fce_mds;
141                 RETURN(0);
142         }
143
144         rc = fld_index_lookup(fld, ctx, seq, mds);
145         RETURN(rc);
146 }
147
148 static int
149 fld_server_handle(struct lu_server_fld *fld,
150                   const struct lu_context *ctx,
151                   __u32 opc, struct md_fld *mf)
152 {
153         int rc;
154         ENTRY;
155
156         switch (opc) {
157         case FLD_CREATE:
158                 rc = fld_server_create(fld, ctx,
159                                        mf->mf_seq, mf->mf_mds);
160                 break;
161         case FLD_DELETE:
162                 rc = fld_server_delete(fld, ctx, mf->mf_seq);
163                 break;
164         case FLD_LOOKUP:
165                 rc = fld_server_lookup(fld, ctx,
166                                        mf->mf_seq, &mf->mf_mds);
167                 break;
168         default:
169                 rc = -EINVAL;
170                 break;
171         }
172         RETURN(rc);
173
174 }
175
176 static int
177 fld_req_handle0(const struct lu_context *ctx,
178                 struct lu_server_fld *fld,
179                 struct ptlrpc_request *req)
180 {
181         int rep_buf_size[3] = { 0, };
182         struct req_capsule pill;
183         struct md_fld *in;
184         struct md_fld *out;
185         int rc = -EPROTO;
186         __u32 *opc;
187         ENTRY;
188
189         req_capsule_init(&pill, req, RCL_SERVER,
190                          rep_buf_size);
191
192         req_capsule_set(&pill, &RQF_FLD_QUERY);
193         req_capsule_pack(&pill);
194
195         opc = req_capsule_client_get(&pill, &RMF_FLD_OPC);
196         if (opc != NULL) {
197                 in = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
198                 if (in == NULL) {
199                         CERROR("cannot unpack fld request\n");
200                         GOTO(out_pill, rc = -EPROTO);
201                 }
202                 out = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
203                 if (out == NULL) {
204                         CERROR("cannot allocate fld response\n");
205                         GOTO(out_pill, rc = -EPROTO);
206                 }
207                 *out = *in;
208                 rc = fld_server_handle(fld, ctx, *opc, out);
209         } else {
210                 CERROR("cannot unpack FLD operation\n");
211         }
212         
213 out_pill:
214         EXIT;
215         req_capsule_fini(&pill);
216         return rc;
217 }
218
219 static int fld_req_handle(struct ptlrpc_request *req)
220 {
221         int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
222         const struct lu_context *ctx;
223         struct lu_site    *site;
224         int rc = -EPROTO;
225         ENTRY;
226
227         OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
228
229         ctx = req->rq_svc_thread->t_ctx;
230         LASSERT(ctx != NULL);
231         LASSERT(ctx->lc_thread == req->rq_svc_thread);
232         if (req->rq_reqmsg->opc == FLD_QUERY) {
233                 if (req->rq_export != NULL) {
234                         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
235                         LASSERT(site != NULL);
236                         rc = fld_req_handle0(ctx, site->ls_fld, req);
237                 } else {
238                         CERROR("Unconnected request\n");
239                         req->rq_status = -ENOTCONN;
240                         GOTO(out, rc = -ENOTCONN);
241                 }
242         } else {
243                 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
244                 req->rq_status = -ENOTSUPP;
245                 rc = ptlrpc_error(req);
246                 RETURN(rc);
247         }
248
249         EXIT;
250 out:
251         target_send_reply(req, rc, fail);
252         return 0;
253 }
254
255 #ifdef LPROCFS
256 static int
257 fld_server_proc_init(struct lu_server_fld *fld)
258 {
259         int rc;
260         ENTRY;
261
262         fld->fld_proc_dir = lprocfs_register(fld->fld_name,
263                                              proc_lustre_root,
264                                              NULL, NULL);
265         if (IS_ERR(fld->fld_proc_dir)) {
266                 CERROR("LProcFS failed in fld-init\n");
267                 rc = PTR_ERR(fld->fld_proc_dir);
268                 GOTO(err, rc);
269         }
270
271         fld->fld_proc_entry = lprocfs_register("services",
272                                                fld->fld_proc_dir,
273                                                NULL, NULL);
274         if (IS_ERR(fld->fld_proc_entry)) {
275                 CERROR("LProcFS failed in fld-init\n");
276                 rc = PTR_ERR(fld->fld_proc_entry);
277                 GOTO(err_type, rc);
278         }
279
280         rc = lprocfs_add_vars(fld->fld_proc_dir,
281                               fld_server_proc_list, fld);
282         if (rc) {
283                 CERROR("can't init FLD proc, rc %d\n", rc);
284                 GOTO(err_type, rc);
285         }
286
287         RETURN(0);
288
289 err_type:
290         lprocfs_remove(fld->fld_proc_dir);
291 err:
292         fld->fld_proc_dir = NULL;
293         fld->fld_proc_entry = NULL;
294         return rc;
295 }
296
297 static void
298 fld_server_proc_fini(struct lu_server_fld *fld)
299 {
300         ENTRY;
301         if (fld->fld_proc_entry) {
302                 lprocfs_remove(fld->fld_proc_entry);
303                 fld->fld_proc_entry = NULL;
304         }
305
306         if (fld->fld_proc_dir) {
307                 lprocfs_remove(fld->fld_proc_dir);
308                 fld->fld_proc_dir = NULL;
309         }
310         EXIT;
311 }
312 #endif
313
314 int
315 fld_server_init(struct lu_server_fld *fld,
316                 const struct lu_context *ctx,
317                 const char *uuid,
318                 struct dt_device *dt)
319 {
320         int rc;
321         struct ptlrpc_service_conf fld_conf = {
322                 .psc_nbufs            = MDS_NBUFS,
323                 .psc_bufsize          = MDS_BUFSIZE,
324                 .psc_max_req_size     = MDS_MAXREQSIZE,
325                 .psc_max_reply_size   = MDS_MAXREPSIZE,
326                 .psc_req_portal       = FLD_REQUEST_PORTAL,
327                 .psc_rep_portal       = MDC_REPLY_PORTAL,
328                 .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
329                 .psc_num_threads      = FLD_NUM_THREADS
330         };
331         ENTRY;
332
333         fld->fld_dt = dt;
334         lu_device_get(&dt->dd_lu_dev);
335
336         snprintf(fld->fld_name, sizeof(fld->fld_name),
337                  "%s-%s", LUSTRE_FLD_NAME, uuid);
338         
339         rc = fld_index_init(fld, ctx);
340         if (rc)
341                 GOTO(out, rc);
342
343 #ifdef LPROCFS
344         rc = fld_server_proc_init(fld);
345         if (rc)
346                 GOTO(out, rc);
347 #endif
348
349         fld->fld_service =
350                 ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
351                                      LUSTRE_FLD_NAME,
352                                      fld->fld_proc_entry, NULL);
353         if (fld->fld_service != NULL)
354                 rc = ptlrpc_start_threads(NULL, fld->fld_service,
355                                           LUSTRE_FLD_NAME);
356         else
357                 rc = -ENOMEM;
358
359         EXIT;
360 out:
361         if (rc)
362                 fld_server_fini(fld, ctx);
363         else
364                 CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
365         return rc;
366 }
367 EXPORT_SYMBOL(fld_server_init);
368
369 void
370 fld_server_fini(struct lu_server_fld *fld,
371                 const struct lu_context *ctx)
372 {
373         ENTRY;
374
375 #ifdef LPROCFS
376         fld_server_proc_fini(fld);
377 #endif
378         
379         if (fld->fld_service != NULL) {
380                 ptlrpc_unregister_service(fld->fld_service);
381                 fld->fld_service = NULL;
382         }
383
384         if (fld->fld_dt != NULL) {
385                 lu_device_put(&fld->fld_dt->dd_lu_dev);
386                 fld_index_fini(fld, ctx);
387                 fld->fld_dt = NULL;
388         }
389         CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
390         EXIT;
391 }
392 EXPORT_SYMBOL(fld_server_fini);
393
394 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
395 MODULE_DESCRIPTION("Lustre FLD");
396 MODULE_LICENSE("GPL");
397
398 cfs_module(mdd, "0.0.4", fld_mod_init, fld_mod_exit);
399 #endif