Whamcloud - gitweb
- count -EAGAIN during close() as valid answer
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_request.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include <lustre_mdc.h>
54 #include "fld_internal.h"
55
56 static int fld_rrb_hash(struct lu_client_fld *fld,
57                         seqno_t seq)
58 {
59         LASSERT(fld->lcf_count > 0);
60         return do_div(seq, fld->lcf_count);
61 }
62
63 static struct lu_fld_target *
64 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
65 {
66         struct lu_fld_target *target;
67         int hash;
68         ENTRY;
69
70         hash = fld_rrb_hash(fld, seq);
71
72         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
73                 if (target->ft_idx == hash)
74                         RETURN(target);
75         }
76
77         CERROR("%s: Can't find target by hash %d (seq "LPX64"). "
78                "Targets (%d):\n", fld->lcf_name, hash, seq,
79                fld->lcf_count);
80
81         list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
82                 const char *srv_name = target->ft_srv != NULL  ?
83                         target->ft_srv->lsf_name : "<null>";
84                 const char *exp_name = target->ft_exp != NULL ?
85                         (char *)target->ft_exp->exp_obd->obd_uuid.uuid :
86                         "<null>";
87
88                 CERROR("  exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64"\n",
89                        target->ft_exp, exp_name, target->ft_srv,
90                        srv_name, target->ft_idx);
91         }
92
93         /*
94          * If target is not found, there is logical error anyway, so here is
95          * LBUG() to catch this situation.
96          */
97         LBUG();
98         RETURN(NULL);
99 }
100
101 static int fld_dht_hash(struct lu_client_fld *fld,
102                         seqno_t seq)
103 {
104         /* XXX: here should be DHT hash */
105         return fld_rrb_hash(fld, seq);
106 }
107
108 static struct lu_fld_target *
109 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
110 {
111         /* XXX: here should be DHT scan code */
112         return fld_rrb_scan(fld, seq);
113 }
114
115 struct lu_fld_hash fld_hash[3] = {
116         {
117                 .fh_name = "DHT",
118                 .fh_hash_func = fld_dht_hash,
119                 .fh_scan_func = fld_dht_scan
120         },
121         {
122                 .fh_name = "RRB",
123                 .fh_hash_func = fld_rrb_hash,
124                 .fh_scan_func = fld_rrb_scan
125         },
126         {
127                 0,
128         }
129 };
130
131 static struct lu_fld_target *
132 fld_client_get_target(struct lu_client_fld *fld,
133                       seqno_t seq)
134 {
135         struct lu_fld_target *target;
136         ENTRY;
137
138         LASSERT(fld->lcf_hash != NULL);
139
140         spin_lock(&fld->lcf_lock);
141         target = fld->lcf_hash->fh_scan_func(fld, seq);
142         spin_unlock(&fld->lcf_lock);
143
144         if (target != NULL) {
145                 CDEBUG(D_INFO, "%s: Found target (idx "LPU64
146                        ") by seq "LPX64"\n", fld->lcf_name,
147                        target->ft_idx, seq);
148         }
149
150         RETURN(target);
151 }
152
153 /*
154  * Add export to FLD. This is usually done by CMM and LMV as they are main users
155  * of FLD module.
156  */
157 int fld_client_add_target(struct lu_client_fld *fld,
158                           struct lu_fld_target *tar)
159 {
160         const char *name = fld_target_name(tar);
161         struct lu_fld_target *target, *tmp;
162         ENTRY;
163
164         LASSERT(tar != NULL);
165         LASSERT(name != NULL);
166         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
167
168         if (fld->lcf_flags != LUSTRE_FLD_INIT) {
169                 CERROR("%s: Attempt to add target %s (idx "LPU64") "
170                        "on fly - skip it\n", fld->lcf_name, name,
171                        tar->ft_idx);
172                 RETURN(0);
173         } else {
174                 CDEBUG(D_INFO, "%s: Adding target %s (idx "
175                        LPU64")\n", fld->lcf_name, name, tar->ft_idx);
176         }
177
178         OBD_ALLOC_PTR(target);
179         if (target == NULL)
180                 RETURN(-ENOMEM);
181
182         spin_lock(&fld->lcf_lock);
183         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
184                 if (tmp->ft_idx == tar->ft_idx) {
185                         spin_unlock(&fld->lcf_lock);
186                         OBD_FREE_PTR(target);
187                         CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
188                                name, fld_target_name(tmp), tmp->ft_idx);
189                         RETURN(-EEXIST);
190                 }
191         }
192
193         target->ft_exp = tar->ft_exp;
194         if (target->ft_exp != NULL)
195                 class_export_get(target->ft_exp);
196         target->ft_srv = tar->ft_srv;
197         target->ft_idx = tar->ft_idx;
198
199         list_add_tail(&target->ft_chain,
200                       &fld->lcf_targets);
201
202         fld->lcf_count++;
203         spin_unlock(&fld->lcf_lock);
204
205         RETURN(0);
206 }
207 EXPORT_SYMBOL(fld_client_add_target);
208
209 /* Remove export from FLD */
210 int fld_client_del_target(struct lu_client_fld *fld,
211                           __u64 idx)
212 {
213         struct lu_fld_target *target, *tmp;
214         ENTRY;
215
216         spin_lock(&fld->lcf_lock);
217         list_for_each_entry_safe(target, tmp,
218                                  &fld->lcf_targets, ft_chain) {
219                 if (target->ft_idx == idx) {
220                         fld->lcf_count--;
221                         list_del(&target->ft_chain);
222                         spin_unlock(&fld->lcf_lock);
223
224                         if (target->ft_exp != NULL)
225                                 class_export_put(target->ft_exp);
226
227                         OBD_FREE_PTR(target);
228                         RETURN(0);
229                 }
230         }
231         spin_unlock(&fld->lcf_lock);
232         RETURN(-ENOENT);
233 }
234 EXPORT_SYMBOL(fld_client_del_target);
235
236 static void fld_client_proc_fini(struct lu_client_fld *fld);
237
238 #ifdef LPROCFS
239 static int fld_client_proc_init(struct lu_client_fld *fld)
240 {
241         int rc;
242         ENTRY;
243
244         fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
245                                              fld_type_proc_dir,
246                                              NULL, NULL);
247
248         if (IS_ERR(fld->lcf_proc_dir)) {
249                 CERROR("%s: LProcFS failed in fld-init\n",
250                        fld->lcf_name);
251                 rc = PTR_ERR(fld->lcf_proc_dir);
252                 RETURN(rc);
253         }
254
255         rc = lprocfs_add_vars(fld->lcf_proc_dir,
256                               fld_client_proc_list, fld);
257         if (rc) {
258                 CERROR("%s: Can't init FLD proc, rc %d\n",
259                        fld->lcf_name, rc);
260                 GOTO(out_cleanup, rc);
261         }
262
263         RETURN(0);
264
265 out_cleanup:
266         fld_client_proc_fini(fld);
267         return rc;
268 }
269
270 static void fld_client_proc_fini(struct lu_client_fld *fld)
271 {
272         ENTRY;
273         if (fld->lcf_proc_dir) {
274                 if (!IS_ERR(fld->lcf_proc_dir))
275                         lprocfs_remove(fld->lcf_proc_dir);
276                 fld->lcf_proc_dir = NULL;
277         }
278         EXIT;
279 }
280 #else
281 static int fld_client_proc_init(struct lu_client_fld *fld)
282 {
283         return 0;
284 }
285
286 static void fld_client_proc_fini(struct lu_client_fld *fld)
287 {
288         return;
289 }
290 #endif
291
292 static inline int hash_is_sane(int hash)
293 {
294         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
295 }
296
297 int fld_client_init(struct lu_client_fld *fld,
298                     const char *prefix, int hash)
299 {
300 #ifdef __KERNEL__
301         int cache_size, cache_threshold;
302 #endif
303         int rc;
304         ENTRY;
305
306         LASSERT(fld != NULL);
307
308         snprintf(fld->lcf_name, sizeof(fld->lcf_name),
309                  "cli-%s", prefix);
310
311         if (!hash_is_sane(hash)) {
312                 CERROR("%s: Wrong hash function %#x\n",
313                        fld->lcf_name, hash);
314                 RETURN(-EINVAL);
315         }
316
317         fld->lcf_count = 0;
318         spin_lock_init(&fld->lcf_lock);
319         fld->lcf_hash = &fld_hash[hash];
320         fld->lcf_flags = LUSTRE_FLD_INIT;
321         INIT_LIST_HEAD(&fld->lcf_targets);
322
323 #ifdef __KERNEL__
324         cache_size = FLD_CLIENT_CACHE_SIZE /
325                 sizeof(struct fld_cache_entry);
326
327         cache_threshold = cache_size *
328                 FLD_CLIENT_CACHE_THRESHOLD / 100;
329
330         fld->lcf_cache = fld_cache_init(fld->lcf_name,
331                                         FLD_CLIENT_HTABLE_SIZE,
332                                         cache_size, cache_threshold);
333         if (IS_ERR(fld->lcf_cache)) {
334                 rc = PTR_ERR(fld->lcf_cache);
335                 fld->lcf_cache = NULL;
336                 GOTO(out, rc);
337         }
338 #endif
339
340         rc = fld_client_proc_init(fld);
341         if (rc)
342                 GOTO(out, rc);
343         EXIT;
344 out:
345         if (rc)
346                 fld_client_fini(fld);
347         else
348                 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n",
349                        fld->lcf_name, fld->lcf_hash->fh_name);
350         return rc;
351 }
352 EXPORT_SYMBOL(fld_client_init);
353
354 void fld_client_fini(struct lu_client_fld *fld)
355 {
356         struct lu_fld_target *target, *tmp;
357         ENTRY;
358
359         fld_client_proc_fini(fld);
360
361         spin_lock(&fld->lcf_lock);
362         list_for_each_entry_safe(target, tmp,
363                                  &fld->lcf_targets, ft_chain) {
364                 fld->lcf_count--;
365                 list_del(&target->ft_chain);
366                 if (target->ft_exp != NULL)
367                         class_export_put(target->ft_exp);
368                 OBD_FREE_PTR(target);
369         }
370         spin_unlock(&fld->lcf_lock);
371
372 #ifdef __KERNEL__
373         if (fld->lcf_cache != NULL) {
374                 if (!IS_ERR(fld->lcf_cache))
375                         fld_cache_fini(fld->lcf_cache);
376                 fld->lcf_cache = NULL;
377         }
378 #endif
379
380         EXIT;
381 }
382 EXPORT_SYMBOL(fld_client_fini);
383
384 static int fld_client_rpc(struct obd_export *exp,
385                           struct md_fld *mf, __u32 fld_op)
386 {
387         int size[3] = { sizeof(struct ptlrpc_body),
388                         sizeof(__u32),
389                         sizeof(struct md_fld) };
390         struct ptlrpc_request *req;
391         struct req_capsule pill;
392         struct md_fld *pmf;
393         __u32 *op;
394         int rc;
395         ENTRY;
396
397         LASSERT(exp != NULL);
398
399         req = ptlrpc_prep_req(class_exp2cliimp(exp),
400                               LUSTRE_MDS_VERSION,
401                               FLD_QUERY, 3, size,
402                               NULL);
403         if (req == NULL)
404                 RETURN(-ENOMEM);
405
406         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
407         req_capsule_set(&pill, &RQF_FLD_QUERY);
408
409         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
410         *op = fld_op;
411
412         pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
413         *pmf = *mf;
414
415         size[1] = sizeof(struct md_fld);
416         ptlrpc_req_set_repsize(req, 2, size);
417         req->rq_request_portal = FLD_REQUEST_PORTAL;
418
419         if (fld_op != FLD_LOOKUP)
420                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
421         rc = ptlrpc_queue_wait(req);
422         if (fld_op != FLD_LOOKUP)
423                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
424         if (rc)
425                 GOTO(out_req, rc);
426
427         pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
428         if (pmf == NULL)
429                 GOTO(out_req, rc = -EFAULT);
430         *mf = *pmf;
431         EXIT;
432 out_req:
433         req_capsule_fini(&pill);
434         ptlrpc_req_finished(req);
435         return rc;
436 }
437
438 int fld_client_create(struct lu_client_fld *fld,
439                       seqno_t seq, mdsno_t mds,
440                       const struct lu_env *env)
441 {
442         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
443         struct lu_fld_target *target;
444         int rc;
445         ENTRY;
446
447         fld->lcf_flags |= LUSTRE_FLD_RUN;
448         target = fld_client_get_target(fld, seq);
449         LASSERT(target != NULL);
450
451         CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
452                LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
453                seq, mds, fld_target_name(target), target->ft_idx);
454
455 #ifdef __KERNEL__
456         if (target->ft_srv != NULL) {
457                 LASSERT(env != NULL);
458                 rc = fld_server_create(target->ft_srv,
459                                        env, seq, mds);
460         } else {
461 #endif
462                 rc = fld_client_rpc(target->ft_exp,
463                                     &md_fld, FLD_CREATE);
464 #ifdef __KERNEL__
465         }
466 #endif
467
468         if (rc == 0) {
469                 /*
470                  * Do not return result of calling fld_cache_insert()
471                  * here. First of all because it may return -EEXISTS. Another
472                  * reason is that, we do not want to stop proceeding because of
473                  * cache errors.
474                  */
475                 fld_cache_insert(fld->lcf_cache, seq, mds);
476         } else {
477                 CERROR("%s: Can't create FLD entry, rc %d\n",
478                        fld->lcf_name, rc);
479         }
480
481         RETURN(rc);
482 }
483 EXPORT_SYMBOL(fld_client_create);
484
485 int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
486                       const struct lu_env *env)
487 {
488         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
489         struct lu_fld_target *target;
490         int rc;
491         ENTRY;
492
493         fld->lcf_flags |= LUSTRE_FLD_RUN;
494         fld_cache_delete(fld->lcf_cache, seq);
495
496         target = fld_client_get_target(fld, seq);
497         LASSERT(target != NULL);
498
499         CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
500                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
501                fld_target_name(target), target->ft_idx);
502
503 #ifdef __KERNEL__
504         if (target->ft_srv != NULL) {
505                 LASSERT(env != NULL);
506                 rc = fld_server_delete(target->ft_srv,
507                                        env, seq);
508         } else {
509 #endif
510                 rc = fld_client_rpc(target->ft_exp,
511                                     &md_fld, FLD_DELETE);
512 #ifdef __KERNEL__
513         }
514 #endif
515
516         RETURN(rc);
517 }
518 EXPORT_SYMBOL(fld_client_delete);
519
520 int fld_client_lookup(struct lu_client_fld *fld,
521                       seqno_t seq, mdsno_t *mds,
522                       const struct lu_env *env)
523 {
524         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
525         struct lu_fld_target *target;
526         int rc;
527         ENTRY;
528
529         fld->lcf_flags |= LUSTRE_FLD_RUN;
530
531         rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
532         if (rc == 0)
533                 RETURN(0);
534
535         /* Can not find it in the cache */
536         target = fld_client_get_target(fld, seq);
537         LASSERT(target != NULL);
538
539         CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on "
540                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
541                fld_target_name(target), target->ft_idx);
542
543 #ifdef __KERNEL__
544         if (target->ft_srv != NULL) {
545                 LASSERT(env != NULL);
546                 rc = fld_server_lookup(target->ft_srv,
547                                        env, seq, &md_fld.mf_mds);
548         } else {
549 #endif
550                 rc = fld_client_rpc(target->ft_exp,
551                                     &md_fld, FLD_LOOKUP);
552 #ifdef __KERNEL__
553         }
554 #endif
555         if (rc == 0) {
556                 *mds = md_fld.mf_mds;
557
558                 /*
559                  * Do not return error here as well. See previous comment in
560                  * same situation in function fld_client_create().
561                  */
562                 fld_cache_insert(fld->lcf_cache, seq, *mds);
563         }
564         RETURN(rc);
565 }
566 EXPORT_SYMBOL(fld_client_lookup);