Whamcloud - gitweb
- commit not finished fid_is_local() related things, Nikita will finish OSD stuff...
[fs/lustre-release.git] / lustre / fld / fld_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: WangDi <wangdi@clusterfs.com>
9  *           Yury Umanets <umka@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FLD
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 # include <linux/jbd.h>
38 # include <asm/div64.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 # include <libcfs/list.h>
42 #endif
43
44 #include <obd.h>
45 #include <obd_class.h>
46 #include <lustre_ver.h>
47 #include <obd_support.h>
48 #include <lprocfs_status.h>
49
50 #include <dt_object.h>
51 #include <md_object.h>
52 #include <lustre_req_layout.h>
53 #include <lustre_fld.h>
54 #include "fld_internal.h"
55
56 #ifdef __KERNEL__
57 struct fld_cache_info *fld_cache = NULL;
58
59 static int fld_init(void)
60 {
61         int rc = 0;
62         ENTRY;
63         
64         fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
65         if (IS_ERR(fld_cache))
66                 rc = PTR_ERR(fld_cache);
67
68         if (rc != 0)
69                 fld_cache = NULL;
70         
71         RETURN(rc);
72 }
73
74 static void fld_fini(void)
75 {
76         ENTRY;
77         if (fld_cache != NULL) {
78                 fld_cache_fini(fld_cache);
79                 fld_cache = NULL;
80         }
81         EXIT;
82 }
83
84 static int __init fld_mod_init(void)
85 {
86         fld_init();
87         return 0;
88 }
89
90 static void __exit fld_mod_exit(void)
91 {
92         fld_fini();
93         return;
94 }
95
96 /* insert index entry and update cache */
97 int
98 fld_server_create(struct lu_server_fld *fld,
99                   const struct lu_context *ctx,
100                   __u64 seq, mdsno_t mds)
101 {
102         int rc;
103         ENTRY;
104         
105         rc = fld_index_create(fld, ctx, seq, mds);
106         if (rc == 0) {
107                 /* do not return result of calling fld_cache_insert()
108                  * here. First of all because it may return -EEXISTS. Another
109                  * reason is that, we do not want to stop proceeding because of
110                  * cache errors. --umka */
111                 fld_cache_insert(fld_cache, seq, mds);
112         }
113         RETURN(rc);
114 }
115 EXPORT_SYMBOL(fld_server_create);
116
117 /* delete index entry and update cache */
118 int
119 fld_server_delete(struct lu_server_fld *fld,
120                   const struct lu_context *ctx,
121                   __u64 seq)
122 {
123         ENTRY;
124         fld_cache_delete(fld_cache, seq);
125         RETURN(fld_index_delete(fld, ctx, seq));
126 }
127 EXPORT_SYMBOL(fld_server_delete);
128
129 /* lookup in cache first and then issue index lookup */
130 int
131 fld_server_lookup(struct lu_server_fld *fld,
132                   const struct lu_context *ctx,
133                   __u64 seq, mdsno_t *mds)
134 {
135         struct fld_cache_entry *flde;
136         int rc;
137         ENTRY;
138         
139         /* lookup it in the cache first */
140         flde = fld_cache_lookup(fld_cache, seq);
141         if (flde != NULL) {
142                 *mds = flde->fce_mds;
143                 RETURN(0);
144         }
145
146         rc = fld_index_lookup(fld, ctx, seq, mds);
147         RETURN(rc);
148 }
149 EXPORT_SYMBOL(fld_server_lookup);
150
151 static int
152 fld_server_handle(struct lu_server_fld *fld,
153                   const struct lu_context *ctx,
154                   __u32 opc, struct md_fld *mf)
155 {
156         int rc;
157         ENTRY;
158
159         switch (opc) {
160         case FLD_CREATE:
161                 rc = fld_server_create(fld, ctx,
162                                        mf->mf_seq, mf->mf_mds);
163                 break;
164         case FLD_DELETE:
165                 rc = fld_server_delete(fld, ctx, mf->mf_seq);
166                 break;
167         case FLD_LOOKUP:
168                 rc = fld_server_lookup(fld, ctx,
169                                        mf->mf_seq, &mf->mf_mds);
170                 break;
171         default:
172                 rc = -EINVAL;
173                 break;
174         }
175         RETURN(rc);
176
177 }
178
179 static int
180 fld_req_handle0(const struct lu_context *ctx,
181                 struct lu_server_fld *fld,
182                 struct ptlrpc_request *req)
183 {
184         int rep_buf_size[3] = { 0, };
185         struct req_capsule pill;
186         struct md_fld *in;
187         struct md_fld *out;
188         int rc = -EPROTO;
189         __u32 *opc;
190         ENTRY;
191
192         req_capsule_init(&pill, req, RCL_SERVER,
193                          rep_buf_size);
194
195         req_capsule_set(&pill, &RQF_FLD_QUERY);
196         req_capsule_pack(&pill);
197
198         opc = req_capsule_client_get(&pill, &RMF_FLD_OPC);
199         if (opc != NULL) {
200                 in = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
201                 if (in == NULL) {
202                         CERROR("cannot unpack fld request\n");
203                         GOTO(out_pill, rc = -EPROTO);
204                 }
205                 out = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
206                 if (out == NULL) {
207                         CERROR("cannot allocate fld response\n");
208                         GOTO(out_pill, rc = -EPROTO);
209                 }
210                 *out = *in;
211                 rc = fld_server_handle(fld, ctx, *opc, out);
212         } else {
213                 CERROR("cannot unpack FLD operation\n");
214         }
215         
216 out_pill:
217         EXIT;
218         req_capsule_fini(&pill);
219         return rc;
220 }
221
222 static int fld_req_handle(struct ptlrpc_request *req)
223 {
224         int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
225         const struct lu_context *ctx;
226         struct lu_site    *site;
227         int rc = -EPROTO;
228         ENTRY;
229
230         OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
231
232         ctx = req->rq_svc_thread->t_ctx;
233         LASSERT(ctx != NULL);
234         LASSERT(ctx->lc_thread == req->rq_svc_thread);
235         if (req->rq_reqmsg->opc == FLD_QUERY) {
236                 if (req->rq_export != NULL) {
237                         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
238                         LASSERT(site != NULL);
239                         rc = fld_req_handle0(ctx, site->ls_fld, req);
240                 } else {
241                         CERROR("Unconnected request\n");
242                         req->rq_status = -ENOTCONN;
243                         GOTO(out, rc = -ENOTCONN);
244                 }
245         } else {
246                 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
247                 req->rq_status = -ENOTSUPP;
248                 rc = ptlrpc_error(req);
249                 RETURN(rc);
250         }
251
252         EXIT;
253 out:
254         target_send_reply(req, rc, fail);
255         return 0;
256 }
257
258 #ifdef LPROCFS
259 static int
260 fld_server_proc_init(struct lu_server_fld *fld)
261 {
262         int rc;
263         ENTRY;
264
265         fld->fld_proc_dir = lprocfs_register(fld->fld_name,
266                                              proc_lustre_root,
267                                              NULL, NULL);
268         if (IS_ERR(fld->fld_proc_dir)) {
269                 CERROR("LProcFS failed in fld-init\n");
270                 rc = PTR_ERR(fld->fld_proc_dir);
271                 GOTO(err, rc);
272         }
273
274         fld->fld_proc_entry = lprocfs_register("services",
275                                                fld->fld_proc_dir,
276                                                NULL, NULL);
277         if (IS_ERR(fld->fld_proc_entry)) {
278                 CERROR("LProcFS failed in fld-init\n");
279                 rc = PTR_ERR(fld->fld_proc_entry);
280                 GOTO(err_type, rc);
281         }
282
283         rc = lprocfs_add_vars(fld->fld_proc_dir,
284                               fld_server_proc_list, fld);
285         if (rc) {
286                 CERROR("can't init FLD proc, rc %d\n", rc);
287                 GOTO(err_type, rc);
288         }
289
290         RETURN(0);
291
292 err_type:
293         lprocfs_remove(fld->fld_proc_dir);
294 err:
295         fld->fld_proc_dir = NULL;
296         fld->fld_proc_entry = NULL;
297         return rc;
298 }
299
300 static void
301 fld_server_proc_fini(struct lu_server_fld *fld)
302 {
303         ENTRY;
304         if (fld->fld_proc_entry) {
305                 lprocfs_remove(fld->fld_proc_entry);
306                 fld->fld_proc_entry = NULL;
307         }
308
309         if (fld->fld_proc_dir) {
310                 lprocfs_remove(fld->fld_proc_dir);
311                 fld->fld_proc_dir = NULL;
312         }
313         EXIT;
314 }
315 #endif
316
317 int
318 fld_server_init(struct lu_server_fld *fld,
319                 const struct lu_context *ctx,
320                 const char *uuid,
321                 struct dt_device *dt)
322 {
323         int rc;
324         struct ptlrpc_service_conf fld_conf = {
325                 .psc_nbufs            = MDS_NBUFS,
326                 .psc_bufsize          = MDS_BUFSIZE,
327                 .psc_max_req_size     = MDS_MAXREQSIZE,
328                 .psc_max_reply_size   = MDS_MAXREPSIZE,
329                 .psc_req_portal       = FLD_REQUEST_PORTAL,
330                 .psc_rep_portal       = MDC_REPLY_PORTAL,
331                 .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
332                 .psc_num_threads      = FLD_NUM_THREADS
333         };
334         ENTRY;
335
336         fld->fld_dt = dt;
337         lu_device_get(&dt->dd_lu_dev);
338
339         snprintf(fld->fld_name, sizeof(fld->fld_name),
340                  "%s-%s", LUSTRE_FLD_NAME, uuid);
341         
342         rc = fld_index_init(fld, ctx);
343         if (rc)
344                 GOTO(out, rc);
345
346 #ifdef LPROCFS
347         rc = fld_server_proc_init(fld);
348         if (rc)
349                 GOTO(out, rc);
350 #endif
351
352         fld->fld_service =
353                 ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
354                                      LUSTRE_FLD_NAME,
355                                      fld->fld_proc_entry, NULL);
356         if (fld->fld_service != NULL)
357                 rc = ptlrpc_start_threads(NULL, fld->fld_service,
358                                           LUSTRE_FLD_NAME);
359         else
360                 rc = -ENOMEM;
361
362         EXIT;
363 out:
364         if (rc)
365                 fld_server_fini(fld, ctx);
366         else
367                 CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
368         return rc;
369 }
370 EXPORT_SYMBOL(fld_server_init);
371
372 void
373 fld_server_fini(struct lu_server_fld *fld,
374                 const struct lu_context *ctx)
375 {
376         ENTRY;
377
378 #ifdef LPROCFS
379         fld_server_proc_fini(fld);
380 #endif
381         
382         if (fld->fld_service != NULL) {
383                 ptlrpc_unregister_service(fld->fld_service);
384                 fld->fld_service = NULL;
385         }
386
387         if (fld->fld_dt != NULL) {
388                 lu_device_put(&fld->fld_dt->dd_lu_dev);
389                 fld_index_fini(fld, ctx);
390                 fld->fld_dt = NULL;
391         }
392         CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
393         EXIT;
394 }
395 EXPORT_SYMBOL(fld_server_fini);
396
397 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
398 MODULE_DESCRIPTION("Lustre FLD");
399 MODULE_LICENSE("GPL");
400
401 cfs_module(mdd, "0.0.4", fld_mod_init, fld_mod_exit);
402 #endif