Whamcloud - gitweb
379e0bf47566817757e141915666a3bf6fd958d1
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405         INIT_LIST_HEAD(&newdev->obd_evict_list);
406         INIT_LIST_HEAD(&newdev->obd_lwp_list);
407
408         llog_group_init(&newdev->obd_olg);
409         /* Detach drops this */
410         atomic_set(&newdev->obd_refcount, 1);
411         lu_ref_init(&newdev->obd_reference);
412         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413
414         newdev->obd_conn_inprogress = 0;
415
416         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417
418         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419                newdev->obd_name, newdev);
420
421         return newdev;
422 }
423
424 /**
425  * Free obd device.
426  *
427  * \param[in] obd obd_device to be freed
428  *
429  * \retval none
430  */
431 void class_free_dev(struct obd_device *obd)
432 {
433         struct obd_type *obd_type = obd->obd_type;
434
435         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438                  "obd %p != obd_devs[%d] %p\n",
439                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  atomic_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Free slot in obd_dev[] used by \a obd.
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         write_lock(&obd_dev_lock);
477         if (obd->obd_minor >= 0) {
478                 LASSERT(obd_devs[obd->obd_minor] == obd);
479                 obd_devs[obd->obd_minor] = NULL;
480                 obd->obd_minor = -1;
481         }
482         write_unlock(&obd_dev_lock);
483 }
484
485 /**
486  * Register obd device.
487  *
488  * Find free slot in obd_devs[], fills it with \a new_obd.
489  *
490  * \param[in] new_obd obd_device to be registered
491  *
492  * \retval 0          success
493  * \retval -EEXIST    device with this name is registered
494  * \retval -EOVERFLOW obd_devs[] is full
495  */
496 int class_register_device(struct obd_device *new_obd)
497 {
498         int ret = 0;
499         int i;
500         int new_obd_minor = 0;
501         bool minor_assign = false;
502         bool retried = false;
503
504 again:
505         write_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd != NULL &&
510                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511
512                         if (!retried) {
513                                 write_unlock(&obd_dev_lock);
514
515                                 /* the obd_device could be waited to be
516                                  * destroyed by the "obd_zombie_impexp_thread".
517                                  */
518                                 obd_zombie_barrier();
519                                 retried = true;
520                                 goto again;
521                         }
522
523                         CERROR("%s: already exists, won't add\n",
524                                obd->obd_name);
525                         /* in case we found a free slot before duplicate */
526                         minor_assign = false;
527                         ret = -EEXIST;
528                         break;
529                 }
530                 if (!minor_assign && obd == NULL) {
531                         new_obd_minor = i;
532                         minor_assign = true;
533                 }
534         }
535
536         if (minor_assign) {
537                 new_obd->obd_minor = new_obd_minor;
538                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540                 obd_devs[new_obd_minor] = new_obd;
541         } else {
542                 if (ret == 0) {
543                         ret = -EOVERFLOW;
544                         CERROR("%s: all %u/%u devices used, increase "
545                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546                                i, class_devno_max(), ret);
547                 }
548         }
549         write_unlock(&obd_dev_lock);
550
551         RETURN(ret);
552 }
553
554 static int class_name2dev_nolock(const char *name)
555 {
556         int i;
557
558         if (!name)
559                 return -1;
560
561         for (i = 0; i < class_devno_max(); i++) {
562                 struct obd_device *obd = class_num2obd(i);
563
564                 if (obd && strcmp(name, obd->obd_name) == 0) {
565                         /* Make sure we finished attaching before we give
566                            out any references */
567                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568                         if (obd->obd_attached) {
569                                 return i;
570                         }
571                         break;
572                 }
573         }
574
575         return -1;
576 }
577
578 int class_name2dev(const char *name)
579 {
580         int i;
581
582         if (!name)
583                 return -1;
584
585         read_lock(&obd_dev_lock);
586         i = class_name2dev_nolock(name);
587         read_unlock(&obd_dev_lock);
588
589         return i;
590 }
591 EXPORT_SYMBOL(class_name2dev);
592
593 struct obd_device *class_name2obd(const char *name)
594 {
595         int dev = class_name2dev(name);
596
597         if (dev < 0 || dev > class_devno_max())
598                 return NULL;
599         return class_num2obd(dev);
600 }
601 EXPORT_SYMBOL(class_name2obd);
602
603 int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 {
605         int i;
606
607         for (i = 0; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
612                         return i;
613                 }
614         }
615
616         return -1;
617 }
618
619 int class_uuid2dev(struct obd_uuid *uuid)
620 {
621         int i;
622
623         read_lock(&obd_dev_lock);
624         i = class_uuid2dev_nolock(uuid);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_uuid2dev);
630
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 {
633         int dev = class_uuid2dev(uuid);
634         if (dev < 0)
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_uuid2obd);
639
640 /**
641  * Get obd device from ::obd_devs[]
642  *
643  * \param num [in] array index
644  *
645  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646  *         otherwise return the obd device there.
647  */
648 struct obd_device *class_num2obd(int num)
649 {
650         struct obd_device *obd = NULL;
651
652         if (num < class_devno_max()) {
653                 obd = obd_devs[num];
654                 if (obd == NULL)
655                         return NULL;
656
657                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658                          "%p obd_magic %08x != %08x\n",
659                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660                 LASSERTF(obd->obd_minor == num,
661                          "%p obd_minor %0d != %0d\n",
662                          obd, obd->obd_minor, num);
663         }
664
665         return obd;
666 }
667 EXPORT_SYMBOL(class_num2obd);
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         ptlrpc_connection_put(exp->exp_connection);
950
951         LASSERT(list_empty(&exp->exp_outstanding_replies));
952         LASSERT(list_empty(&exp->exp_uncommitted_replies));
953         LASSERT(list_empty(&exp->exp_req_replay_queue));
954         LASSERT(list_empty(&exp->exp_hp_rpcs));
955         obd_destroy_export(exp);
956         /* self export doesn't hold a reference to an obd, although it
957          * exists until freeing of the obd */
958         if (exp != obd->obd_self_export)
959                 class_decref(obd, "export", exp);
960
961         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
962         kfree_rcu(exp, exp_handle.h_rcu);
963         EXIT;
964 }
965
966 struct obd_export *class_export_get(struct obd_export *exp)
967 {
968         refcount_inc(&exp->exp_handle.h_ref);
969         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970                refcount_read(&exp->exp_handle.h_ref));
971         return exp;
972 }
973 EXPORT_SYMBOL(class_export_get);
974
975 void class_export_put(struct obd_export *exp)
976 {
977         LASSERT(exp != NULL);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981                refcount_read(&exp->exp_handle.h_ref) - 1);
982
983         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984                 struct obd_device *obd = exp->exp_obd;
985
986                 CDEBUG(D_IOCTL, "final put %p/%s\n",
987                        exp, exp->exp_client_uuid.uuid);
988
989                 /* release nid stat refererence */
990                 lprocfs_exp_cleanup(exp);
991
992                 if (exp == obd->obd_self_export) {
993                         /* self export should be destroyed without
994                          * zombie thread as it doesn't hold a
995                          * reference to obd and doesn't hold any
996                          * resources */
997                         class_export_destroy(exp);
998                         /* self export is destroyed, no class
999                          * references exist and it is safe to free
1000                          * obd */
1001                         class_free_dev(obd);
1002                 } else {
1003                         LASSERT(!list_empty(&exp->exp_obd_chain));
1004                         obd_zombie_export_add(exp);
1005                 }
1006
1007         }
1008 }
1009 EXPORT_SYMBOL(class_export_put);
1010
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 {
1013         struct obd_export *export;
1014
1015         export = container_of(ws, struct obd_export, exp_zombie_work);
1016         class_export_destroy(export);
1017 }
1018
1019 /* Creates a new export, adds it to the hash table, and returns a
1020  * pointer to it. The refcount is 2: one for the hash reference, and
1021  * one for the pointer returned by this function. */
1022 struct obd_export *__class_new_export(struct obd_device *obd,
1023                                       struct obd_uuid *cluuid, bool is_self)
1024 {
1025         struct obd_export *export;
1026         int rc = 0;
1027         ENTRY;
1028
1029         OBD_ALLOC_PTR(export);
1030         if (!export)
1031                 return ERR_PTR(-ENOMEM);
1032
1033         export->exp_conn_cnt = 0;
1034         export->exp_lock_hash = NULL;
1035         export->exp_flock_hash = NULL;
1036         /* 2 = class_handle_hash + last */
1037         refcount_set(&export->exp_handle.h_ref, 2);
1038         atomic_set(&export->exp_rpc_count, 0);
1039         atomic_set(&export->exp_cb_count, 0);
1040         atomic_set(&export->exp_locks_count, 0);
1041 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1042         INIT_LIST_HEAD(&export->exp_locks_list);
1043         spin_lock_init(&export->exp_locks_list_guard);
1044 #endif
1045         atomic_set(&export->exp_replay_count, 0);
1046         export->exp_obd = obd;
1047         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1048         spin_lock_init(&export->exp_uncommitted_replies_lock);
1049         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1050         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1051         INIT_HLIST_NODE(&export->exp_handle.h_link);
1052         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1053         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1054         class_handle_hash(&export->exp_handle, export_handle_owner);
1055         export->exp_last_request_time = ktime_get_real_seconds();
1056         spin_lock_init(&export->exp_lock);
1057         spin_lock_init(&export->exp_rpc_lock);
1058         INIT_HLIST_NODE(&export->exp_gen_hash);
1059         spin_lock_init(&export->exp_bl_list_lock);
1060         INIT_LIST_HEAD(&export->exp_bl_list);
1061         INIT_LIST_HEAD(&export->exp_stale_list);
1062         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1063
1064         export->exp_sp_peer = LUSTRE_SP_ANY;
1065         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066         export->exp_client_uuid = *cluuid;
1067         obd_init_export(export);
1068
1069         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1070         export->exp_root_fid.f_seq = 0;
1071         export->exp_root_fid.f_oid = 0;
1072         export->exp_root_fid.f_ver = 0;
1073
1074         spin_lock(&obd->obd_dev_lock);
1075         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1076                 /* shouldn't happen, but might race */
1077                 if (obd->obd_stopping)
1078                         GOTO(exit_unlock, rc = -ENODEV);
1079
1080                 rc = obd_uuid_add(obd, export);
1081                 if (rc != 0) {
1082                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1083                                       obd->obd_name, cluuid->uuid, rc);
1084                         GOTO(exit_unlock, rc = -EALREADY);
1085                 }
1086         }
1087
1088         if (!is_self) {
1089                 class_incref(obd, "export", export);
1090                 list_add_tail(&export->exp_obd_chain_timed,
1091                               &obd->obd_exports_timed);
1092                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1093                 obd->obd_num_exports++;
1094         } else {
1095                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1096                 INIT_LIST_HEAD(&export->exp_obd_chain);
1097         }
1098         spin_unlock(&obd->obd_dev_lock);
1099         RETURN(export);
1100
1101 exit_unlock:
1102         spin_unlock(&obd->obd_dev_lock);
1103         class_handle_unhash(&export->exp_handle);
1104         obd_destroy_export(export);
1105         OBD_FREE_PTR(export);
1106         return ERR_PTR(rc);
1107 }
1108
1109 struct obd_export *class_new_export(struct obd_device *obd,
1110                                     struct obd_uuid *uuid)
1111 {
1112         return __class_new_export(obd, uuid, false);
1113 }
1114 EXPORT_SYMBOL(class_new_export);
1115
1116 struct obd_export *class_new_export_self(struct obd_device *obd,
1117                                          struct obd_uuid *uuid)
1118 {
1119         return __class_new_export(obd, uuid, true);
1120 }
1121
1122 void class_unlink_export(struct obd_export *exp)
1123 {
1124         class_handle_unhash(&exp->exp_handle);
1125
1126         if (exp->exp_obd->obd_self_export == exp) {
1127                 class_export_put(exp);
1128                 return;
1129         }
1130
1131         spin_lock(&exp->exp_obd->obd_dev_lock);
1132         /* delete an uuid-export hashitem from hashtables */
1133         if (exp != exp->exp_obd->obd_self_export)
1134                 obd_uuid_del(exp->exp_obd, exp);
1135
1136 #ifdef HAVE_SERVER_SUPPORT
1137         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1138                 struct tg_export_data   *ted = &exp->exp_target_data;
1139                 struct cfs_hash         *hash;
1140
1141                 /* Because obd_gen_hash will not be released until
1142                  * class_cleanup(), so hash should never be NULL here */
1143                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1144                 LASSERT(hash != NULL);
1145                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1146                              &exp->exp_gen_hash);
1147                 cfs_hash_putref(hash);
1148         }
1149 #endif /* HAVE_SERVER_SUPPORT */
1150
1151         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1152         list_del_init(&exp->exp_obd_chain_timed);
1153         exp->exp_obd->obd_num_exports--;
1154         spin_unlock(&exp->exp_obd->obd_dev_lock);
1155         atomic_inc(&obd_stale_export_num);
1156
1157         /* A reference is kept by obd_stale_exports list */
1158         obd_stale_export_put(exp);
1159 }
1160 EXPORT_SYMBOL(class_unlink_export);
1161
1162 /* Import management functions */
1163 static void obd_zombie_import_free(struct obd_import *imp)
1164 {
1165         struct obd_import_conn *imp_conn;
1166
1167         ENTRY;
1168         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1169                imp->imp_obd->obd_name);
1170
1171         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1172
1173         ptlrpc_connection_put(imp->imp_connection);
1174
1175         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1176                                                     struct obd_import_conn,
1177                                                     oic_item)) != NULL) {
1178                 list_del_init(&imp_conn->oic_item);
1179                 ptlrpc_connection_put(imp_conn->oic_conn);
1180                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1181         }
1182
1183         LASSERT(imp->imp_sec == NULL);
1184         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1185                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1186         class_decref(imp->imp_obd, "import", imp);
1187         OBD_FREE_PTR(imp);
1188         EXIT;
1189 }
1190
1191 struct obd_import *class_import_get(struct obd_import *import)
1192 {
1193         refcount_inc(&import->imp_refcount);
1194         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1195                refcount_read(&import->imp_refcount),
1196                import->imp_obd->obd_name);
1197         return import;
1198 }
1199 EXPORT_SYMBOL(class_import_get);
1200
1201 void class_import_put(struct obd_import *imp)
1202 {
1203         ENTRY;
1204
1205         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1206
1207         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1208                refcount_read(&imp->imp_refcount) - 1,
1209                imp->imp_obd->obd_name);
1210
1211         if (refcount_dec_and_test(&imp->imp_refcount)) {
1212                 CDEBUG(D_INFO, "final put import %p\n", imp);
1213                 obd_zombie_import_add(imp);
1214         }
1215
1216         EXIT;
1217 }
1218 EXPORT_SYMBOL(class_import_put);
1219
1220 static void init_imp_at(struct imp_at *at) {
1221         int i;
1222         at_init(&at->iat_net_latency, 0, 0);
1223         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1224                 /* max service estimates are tracked on the server side, so
1225                    don't use the AT history here, just use the last reported
1226                    val. (But keep hist for proc histogram, worst_ever) */
1227                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1228                         AT_FLG_NOHIST);
1229         }
1230 }
1231
1232 static void obd_zombie_imp_cull(struct work_struct *ws)
1233 {
1234         struct obd_import *import;
1235
1236         import = container_of(ws, struct obd_import, imp_zombie_work);
1237         obd_zombie_import_free(import);
1238 }
1239
1240 struct obd_import *class_new_import(struct obd_device *obd)
1241 {
1242         struct obd_import *imp;
1243         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1244
1245         OBD_ALLOC(imp, sizeof(*imp));
1246         if (imp == NULL)
1247                 return NULL;
1248
1249         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1250         INIT_LIST_HEAD(&imp->imp_replay_list);
1251         INIT_LIST_HEAD(&imp->imp_sending_list);
1252         INIT_LIST_HEAD(&imp->imp_delayed_list);
1253         INIT_LIST_HEAD(&imp->imp_committed_list);
1254         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1255         imp->imp_known_replied_xid = 0;
1256         imp->imp_replay_cursor = &imp->imp_committed_list;
1257         spin_lock_init(&imp->imp_lock);
1258         imp->imp_last_success_conn = 0;
1259         imp->imp_state = LUSTRE_IMP_NEW;
1260         imp->imp_obd = class_incref(obd, "import", imp);
1261         rwlock_init(&imp->imp_sec_lock);
1262         init_waitqueue_head(&imp->imp_recovery_waitq);
1263         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1264
1265         if (curr_pid_ns && curr_pid_ns->child_reaper)
1266                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1267         else
1268                 imp->imp_sec_refpid = 1;
1269
1270         refcount_set(&imp->imp_refcount, 2);
1271         atomic_set(&imp->imp_unregistering, 0);
1272         atomic_set(&imp->imp_reqs, 0);
1273         atomic_set(&imp->imp_inflight, 0);
1274         atomic_set(&imp->imp_replay_inflight, 0);
1275         init_waitqueue_head(&imp->imp_replay_waitq);
1276         atomic_set(&imp->imp_inval_count, 0);
1277         atomic_set(&imp->imp_waiting, 0);
1278         INIT_LIST_HEAD(&imp->imp_conn_list);
1279         init_imp_at(&imp->imp_at);
1280
1281         /* the default magic is V2, will be used in connect RPC, and
1282          * then adjusted according to the flags in request/reply. */
1283         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1284
1285         return imp;
1286 }
1287 EXPORT_SYMBOL(class_new_import);
1288
1289 void class_destroy_import(struct obd_import *import)
1290 {
1291         LASSERT(import != NULL);
1292         LASSERT(import != LP_POISON);
1293
1294         spin_lock(&import->imp_lock);
1295         import->imp_generation++;
1296         spin_unlock(&import->imp_lock);
1297         class_import_put(import);
1298 }
1299 EXPORT_SYMBOL(class_destroy_import);
1300
1301 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1302
1303 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1304 {
1305         spin_lock(&exp->exp_locks_list_guard);
1306
1307         LASSERT(lock->l_exp_refs_nr >= 0);
1308
1309         if (lock->l_exp_refs_target != NULL &&
1310             lock->l_exp_refs_target != exp) {
1311                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1312                               exp, lock, lock->l_exp_refs_target);
1313         }
1314         if ((lock->l_exp_refs_nr ++) == 0) {
1315                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1316                 lock->l_exp_refs_target = exp;
1317         }
1318         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1319                lock, exp, lock->l_exp_refs_nr);
1320         spin_unlock(&exp->exp_locks_list_guard);
1321 }
1322 EXPORT_SYMBOL(__class_export_add_lock_ref);
1323
1324 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1325 {
1326         spin_lock(&exp->exp_locks_list_guard);
1327         LASSERT(lock->l_exp_refs_nr > 0);
1328         if (lock->l_exp_refs_target != exp) {
1329                 LCONSOLE_WARN("lock %p, "
1330                               "mismatching export pointers: %p, %p\n",
1331                               lock, lock->l_exp_refs_target, exp);
1332         }
1333         if (-- lock->l_exp_refs_nr == 0) {
1334                 list_del_init(&lock->l_exp_refs_link);
1335                 lock->l_exp_refs_target = NULL;
1336         }
1337         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1338                lock, exp, lock->l_exp_refs_nr);
1339         spin_unlock(&exp->exp_locks_list_guard);
1340 }
1341 EXPORT_SYMBOL(__class_export_del_lock_ref);
1342 #endif
1343
1344 /* A connection defines an export context in which preallocation can
1345    be managed. This releases the export pointer reference, and returns
1346    the export handle, so the export refcount is 1 when this function
1347    returns. */
1348 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1349                   struct obd_uuid *cluuid)
1350 {
1351         struct obd_export *export;
1352         LASSERT(conn != NULL);
1353         LASSERT(obd != NULL);
1354         LASSERT(cluuid != NULL);
1355         ENTRY;
1356
1357         export = class_new_export(obd, cluuid);
1358         if (IS_ERR(export))
1359                 RETURN(PTR_ERR(export));
1360
1361         conn->cookie = export->exp_handle.h_cookie;
1362         class_export_put(export);
1363
1364         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1365                cluuid->uuid, conn->cookie);
1366         RETURN(0);
1367 }
1368 EXPORT_SYMBOL(class_connect);
1369
1370 /* if export is involved in recovery then clean up related things */
1371 static void class_export_recovery_cleanup(struct obd_export *exp)
1372 {
1373         struct obd_device *obd = exp->exp_obd;
1374
1375         spin_lock(&obd->obd_recovery_task_lock);
1376         if (obd->obd_recovering) {
1377                 if (exp->exp_in_recovery) {
1378                         spin_lock(&exp->exp_lock);
1379                         exp->exp_in_recovery = 0;
1380                         spin_unlock(&exp->exp_lock);
1381                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1382                         atomic_dec(&obd->obd_connected_clients);
1383                 }
1384
1385                 /* if called during recovery then should update
1386                  * obd_stale_clients counter,
1387                  * lightweight exports are not counted */
1388                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1389                         exp->exp_obd->obd_stale_clients++;
1390         }
1391         spin_unlock(&obd->obd_recovery_task_lock);
1392
1393         spin_lock(&exp->exp_lock);
1394         /** Cleanup req replay fields */
1395         if (exp->exp_req_replay_needed) {
1396                 exp->exp_req_replay_needed = 0;
1397
1398                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1399                 atomic_dec(&obd->obd_req_replay_clients);
1400         }
1401
1402         /** Cleanup lock replay data */
1403         if (exp->exp_lock_replay_needed) {
1404                 exp->exp_lock_replay_needed = 0;
1405
1406                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1407                 atomic_dec(&obd->obd_lock_replay_clients);
1408         }
1409         spin_unlock(&exp->exp_lock);
1410 }
1411
1412 /* This function removes 1-3 references from the export:
1413  * 1 - for export pointer passed
1414  * and if disconnect really need
1415  * 2 - removing from hash
1416  * 3 - in client_unlink_export
1417  * The export pointer passed to this function can destroyed */
1418 int class_disconnect(struct obd_export *export)
1419 {
1420         int already_disconnected;
1421         ENTRY;
1422
1423         if (export == NULL) {
1424                 CWARN("attempting to free NULL export %p\n", export);
1425                 RETURN(-EINVAL);
1426         }
1427
1428         spin_lock(&export->exp_lock);
1429         already_disconnected = export->exp_disconnected;
1430         export->exp_disconnected = 1;
1431 #ifdef HAVE_SERVER_SUPPORT
1432         /*  We hold references of export for uuid hash
1433          *  and nid_hash and export link at least. So
1434          *  it is safe to call rh*table_remove_fast in
1435          *  there.
1436          */
1437         obd_nid_del(export->exp_obd, export);
1438 #endif /* HAVE_SERVER_SUPPORT */
1439         spin_unlock(&export->exp_lock);
1440
1441         /* class_cleanup(), abort_recovery(), and class_fail_export()
1442          * all end up in here, and if any of them race we shouldn't
1443          * call extra class_export_puts(). */
1444         if (already_disconnected)
1445                 GOTO(no_disconn, already_disconnected);
1446
1447         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1448                export->exp_handle.h_cookie);
1449
1450         class_export_recovery_cleanup(export);
1451         class_unlink_export(export);
1452 no_disconn:
1453         class_export_put(export);
1454         RETURN(0);
1455 }
1456 EXPORT_SYMBOL(class_disconnect);
1457
1458 /* Return non-zero for a fully connected export */
1459 int class_connected_export(struct obd_export *exp)
1460 {
1461         int connected = 0;
1462
1463         if (exp) {
1464                 spin_lock(&exp->exp_lock);
1465                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1466                 spin_unlock(&exp->exp_lock);
1467         }
1468         return connected;
1469 }
1470 EXPORT_SYMBOL(class_connected_export);
1471
1472 static void class_disconnect_export_list(struct list_head *list,
1473                                          enum obd_option flags)
1474 {
1475         int rc;
1476         struct obd_export *exp;
1477         ENTRY;
1478
1479         /* It's possible that an export may disconnect itself, but
1480          * nothing else will be added to this list.
1481          */
1482         while ((exp = list_first_entry_or_null(list, struct obd_export,
1483                                                exp_obd_chain)) != NULL) {
1484                 /* need for safe call CDEBUG after obd_disconnect */
1485                 class_export_get(exp);
1486
1487                 spin_lock(&exp->exp_lock);
1488                 exp->exp_flags = flags;
1489                 spin_unlock(&exp->exp_lock);
1490
1491                 if (obd_uuid_equals(&exp->exp_client_uuid,
1492                                     &exp->exp_obd->obd_uuid)) {
1493                         CDEBUG(D_HA,
1494                                "exp %p export uuid == obd uuid, don't discon\n",
1495                                exp);
1496                         /* Need to delete this now so we don't end up pointing
1497                          * to work_list later when this export is cleaned up. */
1498                         list_del_init(&exp->exp_obd_chain);
1499                         class_export_put(exp);
1500                         continue;
1501                 }
1502
1503                 class_export_get(exp);
1504                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1505                        "last request at %lld\n",
1506                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1507                        exp, exp->exp_last_request_time);
1508                 /* release one export reference anyway */
1509                 rc = obd_disconnect(exp);
1510
1511                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1512                        obd_export_nid2str(exp), exp, rc);
1513                 class_export_put(exp);
1514         }
1515         EXIT;
1516 }
1517
1518 void class_disconnect_exports(struct obd_device *obd)
1519 {
1520         LIST_HEAD(work_list);
1521         ENTRY;
1522
1523         /* Move all of the exports from obd_exports to a work list, en masse. */
1524         spin_lock(&obd->obd_dev_lock);
1525         list_splice_init(&obd->obd_exports, &work_list);
1526         list_splice_init(&obd->obd_delayed_exports, &work_list);
1527         spin_unlock(&obd->obd_dev_lock);
1528
1529         if (!list_empty(&work_list)) {
1530                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1531                        "disconnecting them\n", obd->obd_minor, obd);
1532                 class_disconnect_export_list(&work_list,
1533                                              exp_flags_from_obd(obd));
1534         } else
1535                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1536                        obd->obd_minor, obd);
1537         EXIT;
1538 }
1539 EXPORT_SYMBOL(class_disconnect_exports);
1540
1541 /* Remove exports that have not completed recovery.
1542  */
1543 void class_disconnect_stale_exports(struct obd_device *obd,
1544                                     int (*test_export)(struct obd_export *))
1545 {
1546         LIST_HEAD(work_list);
1547         struct obd_export *exp, *n;
1548         int evicted = 0;
1549         ENTRY;
1550
1551         spin_lock(&obd->obd_dev_lock);
1552         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1553                                  exp_obd_chain) {
1554                 /* don't count self-export as client */
1555                 if (obd_uuid_equals(&exp->exp_client_uuid,
1556                                     &exp->exp_obd->obd_uuid))
1557                         continue;
1558
1559                 /* don't evict clients which have no slot in last_rcvd
1560                  * (e.g. lightweight connection) */
1561                 if (exp->exp_target_data.ted_lr_idx == -1)
1562                         continue;
1563
1564                 spin_lock(&exp->exp_lock);
1565                 if (exp->exp_failed || test_export(exp)) {
1566                         spin_unlock(&exp->exp_lock);
1567                         continue;
1568                 }
1569                 exp->exp_failed = 1;
1570                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1571                 spin_unlock(&exp->exp_lock);
1572
1573                 list_move(&exp->exp_obd_chain, &work_list);
1574                 evicted++;
1575                 CWARN("%s: disconnect stale client %s@%s\n",
1576                       obd->obd_name, exp->exp_client_uuid.uuid,
1577                       obd_export_nid2str(exp));
1578                 print_export_data(exp, "EVICTING", 0, D_HA);
1579         }
1580         spin_unlock(&obd->obd_dev_lock);
1581
1582         if (evicted)
1583                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1584                               obd->obd_name, evicted);
1585
1586         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1587                                                  OBD_OPT_ABORT_RECOV);
1588         EXIT;
1589 }
1590 EXPORT_SYMBOL(class_disconnect_stale_exports);
1591
1592 void class_fail_export(struct obd_export *exp)
1593 {
1594         int rc, already_failed;
1595
1596         spin_lock(&exp->exp_lock);
1597         already_failed = exp->exp_failed;
1598         exp->exp_failed = 1;
1599         spin_unlock(&exp->exp_lock);
1600
1601         if (already_failed) {
1602                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1603                        exp, exp->exp_client_uuid.uuid);
1604                 return;
1605         }
1606
1607         atomic_inc(&exp->exp_obd->obd_eviction_count);
1608
1609         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1610                exp, exp->exp_client_uuid.uuid);
1611
1612         if (obd_dump_on_timeout)
1613                 libcfs_debug_dumplog();
1614
1615         /* need for safe call CDEBUG after obd_disconnect */
1616         class_export_get(exp);
1617
1618         /* Most callers into obd_disconnect are removing their own reference
1619          * (request, for example) in addition to the one from the hash table.
1620          * We don't have such a reference here, so make one. */
1621         class_export_get(exp);
1622         rc = obd_disconnect(exp);
1623         if (rc)
1624                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1625         else
1626                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1627                        exp, exp->exp_client_uuid.uuid);
1628         class_export_put(exp);
1629 }
1630 EXPORT_SYMBOL(class_fail_export);
1631
1632 #ifdef HAVE_SERVER_SUPPORT
1633
1634 static int take_first(struct obd_export *exp, void *data)
1635 {
1636         struct obd_export **expp = data;
1637
1638         if (*expp)
1639                 /* already have one */
1640                 return 0;
1641         if (exp->exp_failed)
1642                 /* Don't want this one */
1643                 return 0;
1644         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1645                 /* Cannot get a ref on this one */
1646                 return 0;
1647         *expp = exp;
1648         return 1;
1649 }
1650
1651 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1652 {
1653         struct lnet_nid nid_key;
1654         struct obd_export *doomed_exp;
1655         int exports_evicted = 0;
1656
1657         libcfs_strnid(&nid_key, nid);
1658
1659         spin_lock(&obd->obd_dev_lock);
1660         /* umount has run already, so evict thread should leave
1661          * its task to umount thread now */
1662         if (obd->obd_stopping) {
1663                 spin_unlock(&obd->obd_dev_lock);
1664                 return exports_evicted;
1665         }
1666         spin_unlock(&obd->obd_dev_lock);
1667
1668         doomed_exp = NULL;
1669         while (obd_nid_export_for_each(obd, &nid_key,
1670                                        take_first, &doomed_exp) > 0) {
1671
1672                 LASSERTF(doomed_exp != obd->obd_self_export,
1673                          "self-export is hashed by NID?\n");
1674
1675                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1676                               obd->obd_name,
1677                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1678                               obd_export_nid2str(doomed_exp));
1679
1680                 class_fail_export(doomed_exp);
1681                 class_export_put(doomed_exp);
1682                 exports_evicted++;
1683                 doomed_exp = NULL;
1684         }
1685
1686         if (!exports_evicted)
1687                 CDEBUG(D_HA,
1688                        "%s: can't disconnect NID '%s': no exports found\n",
1689                        obd->obd_name, nid);
1690         return exports_evicted;
1691 }
1692 EXPORT_SYMBOL(obd_export_evict_by_nid);
1693
1694 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1695 {
1696         struct obd_export *doomed_exp = NULL;
1697         struct obd_uuid doomed_uuid;
1698         int exports_evicted = 0;
1699
1700         spin_lock(&obd->obd_dev_lock);
1701         if (obd->obd_stopping) {
1702                 spin_unlock(&obd->obd_dev_lock);
1703                 return exports_evicted;
1704         }
1705         spin_unlock(&obd->obd_dev_lock);
1706
1707         obd_str2uuid(&doomed_uuid, uuid);
1708         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1709                 CERROR("%s: can't evict myself\n", obd->obd_name);
1710                 return exports_evicted;
1711         }
1712
1713         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1714         if (doomed_exp == NULL) {
1715                 CERROR("%s: can't disconnect %s: no exports found\n",
1716                        obd->obd_name, uuid);
1717         } else {
1718                 CWARN("%s: evicting %s at adminstrative request\n",
1719                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1720                 class_fail_export(doomed_exp);
1721                 class_export_put(doomed_exp);
1722                 obd_uuid_del(obd, doomed_exp);
1723                 exports_evicted++;
1724         }
1725
1726         return exports_evicted;
1727 }
1728 #endif /* HAVE_SERVER_SUPPORT */
1729
1730 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1731 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1732 EXPORT_SYMBOL(class_export_dump_hook);
1733 #endif
1734
1735 static void print_export_data(struct obd_export *exp, const char *status,
1736                               int locks, int debug_level)
1737 {
1738         struct ptlrpc_reply_state *rs;
1739         struct ptlrpc_reply_state *first_reply = NULL;
1740         int nreplies = 0;
1741
1742         spin_lock(&exp->exp_lock);
1743         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1744                             rs_exp_list) {
1745                 if (nreplies == 0)
1746                         first_reply = rs;
1747                 nreplies++;
1748         }
1749         spin_unlock(&exp->exp_lock);
1750
1751         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1752                "%p %s %llu stale:%d\n",
1753                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1754                obd_export_nid2str(exp),
1755                refcount_read(&exp->exp_handle.h_ref),
1756                atomic_read(&exp->exp_rpc_count),
1757                atomic_read(&exp->exp_cb_count),
1758                atomic_read(&exp->exp_locks_count),
1759                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1760                nreplies, first_reply, nreplies > 3 ? "..." : "",
1761                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1762 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1763         if (locks && class_export_dump_hook != NULL)
1764                 class_export_dump_hook(exp);
1765 #endif
1766 }
1767
1768 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1769 {
1770         struct obd_export *exp;
1771
1772         spin_lock(&obd->obd_dev_lock);
1773         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1774                 print_export_data(exp, "ACTIVE", locks, debug_level);
1775         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1776                 print_export_data(exp, "UNLINKED", locks, debug_level);
1777         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1778                 print_export_data(exp, "DELAYED", locks, debug_level);
1779         spin_unlock(&obd->obd_dev_lock);
1780 }
1781
1782 void obd_exports_barrier(struct obd_device *obd)
1783 {
1784         int waited = 2;
1785         LASSERT(list_empty(&obd->obd_exports));
1786         spin_lock(&obd->obd_dev_lock);
1787         while (!list_empty(&obd->obd_unlinked_exports)) {
1788                 spin_unlock(&obd->obd_dev_lock);
1789                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1790                 if (waited > 5 && is_power_of_2(waited)) {
1791                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1792                                       "more than %d seconds. "
1793                                       "The obd refcount = %d. Is it stuck?\n",
1794                                       obd->obd_name, waited,
1795                                       atomic_read(&obd->obd_refcount));
1796                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1797                 }
1798                 waited *= 2;
1799                 spin_lock(&obd->obd_dev_lock);
1800         }
1801         spin_unlock(&obd->obd_dev_lock);
1802 }
1803 EXPORT_SYMBOL(obd_exports_barrier);
1804
1805 /**
1806  * Add export to the obd_zombe thread and notify it.
1807  */
1808 static void obd_zombie_export_add(struct obd_export *exp) {
1809         atomic_dec(&obd_stale_export_num);
1810         spin_lock(&exp->exp_obd->obd_dev_lock);
1811         LASSERT(!list_empty(&exp->exp_obd_chain));
1812         list_del_init(&exp->exp_obd_chain);
1813         spin_unlock(&exp->exp_obd->obd_dev_lock);
1814
1815         queue_work(zombie_wq, &exp->exp_zombie_work);
1816 }
1817
1818 /**
1819  * Add import to the obd_zombe thread and notify it.
1820  */
1821 static void obd_zombie_import_add(struct obd_import *imp) {
1822         LASSERT(imp->imp_sec == NULL);
1823
1824         queue_work(zombie_wq, &imp->imp_zombie_work);
1825 }
1826
1827 /**
1828  * wait when obd_zombie import/export queues become empty
1829  */
1830 void obd_zombie_barrier(void)
1831 {
1832         flush_workqueue(zombie_wq);
1833 }
1834 EXPORT_SYMBOL(obd_zombie_barrier);
1835
1836
1837 struct obd_export *obd_stale_export_get(void)
1838 {
1839         struct obd_export *exp = NULL;
1840         ENTRY;
1841
1842         spin_lock(&obd_stale_export_lock);
1843         if (!list_empty(&obd_stale_exports)) {
1844                 exp = list_first_entry(&obd_stale_exports,
1845                                        struct obd_export, exp_stale_list);
1846                 list_del_init(&exp->exp_stale_list);
1847         }
1848         spin_unlock(&obd_stale_export_lock);
1849
1850         if (exp) {
1851                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1852                        atomic_read(&obd_stale_export_num));
1853         }
1854         RETURN(exp);
1855 }
1856 EXPORT_SYMBOL(obd_stale_export_get);
1857
1858 void obd_stale_export_put(struct obd_export *exp)
1859 {
1860         ENTRY;
1861
1862         LASSERT(list_empty(&exp->exp_stale_list));
1863         if (exp->exp_lock_hash &&
1864             atomic_read(&exp->exp_lock_hash->hs_count)) {
1865                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1866                        atomic_read(&obd_stale_export_num));
1867
1868                 spin_lock_bh(&exp->exp_bl_list_lock);
1869                 spin_lock(&obd_stale_export_lock);
1870                 /* Add to the tail if there is no blocked locks,
1871                  * to the head otherwise. */
1872                 if (list_empty(&exp->exp_bl_list))
1873                         list_add_tail(&exp->exp_stale_list,
1874                                       &obd_stale_exports);
1875                 else
1876                         list_add(&exp->exp_stale_list,
1877                                  &obd_stale_exports);
1878
1879                 spin_unlock(&obd_stale_export_lock);
1880                 spin_unlock_bh(&exp->exp_bl_list_lock);
1881         } else {
1882                 class_export_put(exp);
1883         }
1884         EXIT;
1885 }
1886 EXPORT_SYMBOL(obd_stale_export_put);
1887
1888 /**
1889  * Adjust the position of the export in the stale list,
1890  * i.e. move to the head of the list if is needed.
1891  **/
1892 void obd_stale_export_adjust(struct obd_export *exp)
1893 {
1894         LASSERT(exp != NULL);
1895         spin_lock_bh(&exp->exp_bl_list_lock);
1896         spin_lock(&obd_stale_export_lock);
1897
1898         if (!list_empty(&exp->exp_stale_list) &&
1899             !list_empty(&exp->exp_bl_list))
1900                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1901
1902         spin_unlock(&obd_stale_export_lock);
1903         spin_unlock_bh(&exp->exp_bl_list_lock);
1904 }
1905 EXPORT_SYMBOL(obd_stale_export_adjust);
1906
1907 /**
1908  * start destroy zombie import/export thread
1909  */
1910 int obd_zombie_impexp_init(void)
1911 {
1912         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1913                                            0, CFS_CPT_ANY,
1914                                            cfs_cpt_number(cfs_cpt_tab));
1915
1916         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1917 }
1918
1919 /**
1920  * stop destroy zombie import/export thread
1921  */
1922 void obd_zombie_impexp_stop(void)
1923 {
1924         destroy_workqueue(zombie_wq);
1925         LASSERT(list_empty(&obd_stale_exports));
1926 }
1927
1928 /***** Kernel-userspace comm helpers *******/
1929
1930 /* Get length of entire message, including header */
1931 int kuc_len(int payload_len)
1932 {
1933         return sizeof(struct kuc_hdr) + payload_len;
1934 }
1935 EXPORT_SYMBOL(kuc_len);
1936
1937 /* Get a pointer to kuc header, given a ptr to the payload
1938  * @param p Pointer to payload area
1939  * @returns Pointer to kuc header
1940  */
1941 struct kuc_hdr * kuc_ptr(void *p)
1942 {
1943         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1944         LASSERT(lh->kuc_magic == KUC_MAGIC);
1945         return lh;
1946 }
1947 EXPORT_SYMBOL(kuc_ptr);
1948
1949 /* Alloc space for a message, and fill in header
1950  * @return Pointer to payload area
1951  */
1952 void *kuc_alloc(int payload_len, int transport, int type)
1953 {
1954         struct kuc_hdr *lh;
1955         int len = kuc_len(payload_len);
1956
1957         OBD_ALLOC(lh, len);
1958         if (lh == NULL)
1959                 return ERR_PTR(-ENOMEM);
1960
1961         lh->kuc_magic = KUC_MAGIC;
1962         lh->kuc_transport = transport;
1963         lh->kuc_msgtype = type;
1964         lh->kuc_msglen = len;
1965
1966         return (void *)(lh + 1);
1967 }
1968 EXPORT_SYMBOL(kuc_alloc);
1969
1970 /* Takes pointer to payload area */
1971 void kuc_free(void *p, int payload_len)
1972 {
1973         struct kuc_hdr *lh = kuc_ptr(p);
1974         OBD_FREE(lh, kuc_len(payload_len));
1975 }
1976 EXPORT_SYMBOL(kuc_free);
1977
1978 struct obd_request_slot_waiter {
1979         struct list_head        orsw_entry;
1980         wait_queue_head_t       orsw_waitq;
1981         bool                    orsw_signaled;
1982 };
1983
1984 static bool obd_request_slot_avail(struct client_obd *cli,
1985                                    struct obd_request_slot_waiter *orsw)
1986 {
1987         bool avail;
1988
1989         spin_lock(&cli->cl_loi_list_lock);
1990         avail = !!list_empty(&orsw->orsw_entry);
1991         spin_unlock(&cli->cl_loi_list_lock);
1992
1993         return avail;
1994 };
1995
1996 /*
1997  * For network flow control, the RPC sponsor needs to acquire a credit
1998  * before sending the RPC. The credits count for a connection is defined
1999  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2000  * the subsequent RPC sponsors need to wait until others released their
2001  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2002  */
2003 int obd_get_request_slot(struct client_obd *cli)
2004 {
2005         struct obd_request_slot_waiter   orsw;
2006         int                              rc;
2007
2008         spin_lock(&cli->cl_loi_list_lock);
2009         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2010                 cli->cl_rpcs_in_flight++;
2011                 spin_unlock(&cli->cl_loi_list_lock);
2012                 return 0;
2013         }
2014
2015         init_waitqueue_head(&orsw.orsw_waitq);
2016         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2017         orsw.orsw_signaled = false;
2018         spin_unlock(&cli->cl_loi_list_lock);
2019
2020         rc = l_wait_event_abortable(orsw.orsw_waitq,
2021                                     obd_request_slot_avail(cli, &orsw) ||
2022                                     orsw.orsw_signaled);
2023
2024         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2025          * freed but other (such as obd_put_request_slot) is using it. */
2026         spin_lock(&cli->cl_loi_list_lock);
2027         if (rc != 0) {
2028                 if (!orsw.orsw_signaled) {
2029                         if (list_empty(&orsw.orsw_entry))
2030                                 cli->cl_rpcs_in_flight--;
2031                         else
2032                                 list_del(&orsw.orsw_entry);
2033                 }
2034                 rc = -EINTR;
2035         }
2036
2037         if (orsw.orsw_signaled) {
2038                 LASSERT(list_empty(&orsw.orsw_entry));
2039
2040                 rc = -EINTR;
2041         }
2042         spin_unlock(&cli->cl_loi_list_lock);
2043
2044         return rc;
2045 }
2046 EXPORT_SYMBOL(obd_get_request_slot);
2047
2048 void obd_put_request_slot(struct client_obd *cli)
2049 {
2050         struct obd_request_slot_waiter *orsw;
2051
2052         spin_lock(&cli->cl_loi_list_lock);
2053         cli->cl_rpcs_in_flight--;
2054
2055         /* If there is free slot, wakeup the first waiter. */
2056         if (!list_empty(&cli->cl_flight_waiters) &&
2057             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2058                 orsw = list_first_entry(&cli->cl_flight_waiters,
2059                                         struct obd_request_slot_waiter,
2060                                         orsw_entry);
2061                 list_del_init(&orsw->orsw_entry);
2062                 cli->cl_rpcs_in_flight++;
2063                 wake_up(&orsw->orsw_waitq);
2064         }
2065         spin_unlock(&cli->cl_loi_list_lock);
2066 }
2067 EXPORT_SYMBOL(obd_put_request_slot);
2068
2069 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2070 {
2071         return cli->cl_max_rpcs_in_flight;
2072 }
2073 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2074
2075 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2076 {
2077         struct obd_request_slot_waiter *orsw;
2078         __u32                           old;
2079         int                             diff;
2080         int                             i;
2081         int                             rc;
2082
2083         if (max > OBD_MAX_RIF_MAX || max < 1)
2084                 return -ERANGE;
2085
2086         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2087                cli->cl_import->imp_obd->obd_name, max,
2088                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2089
2090         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2091                    LUSTRE_MDC_NAME) == 0) {
2092                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2093                  * strictly lower that max_rpcs_in_flight */
2094                 if (max < 2) {
2095                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2096                                cli->cl_import->imp_obd->obd_name);
2097                         return -ERANGE;
2098                 }
2099                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2100                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2101                         if (rc != 0)
2102                                 return rc;
2103                 }
2104         }
2105
2106         spin_lock(&cli->cl_loi_list_lock);
2107         old = cli->cl_max_rpcs_in_flight;
2108         cli->cl_max_rpcs_in_flight = max;
2109         client_adjust_max_dirty(cli);
2110
2111         diff = max - old;
2112
2113         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2114         for (i = 0; i < diff; i++) {
2115                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2116                                                 struct obd_request_slot_waiter,
2117                                                 orsw_entry);
2118                 if (!orsw)
2119                         break;
2120
2121                 list_del_init(&orsw->orsw_entry);
2122                 cli->cl_rpcs_in_flight++;
2123                 wake_up(&orsw->orsw_waitq);
2124         }
2125         spin_unlock(&cli->cl_loi_list_lock);
2126
2127         return 0;
2128 }
2129 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2130
2131 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2132 {
2133         return cli->cl_max_mod_rpcs_in_flight;
2134 }
2135 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2136
2137 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2138 {
2139         struct obd_connect_data *ocd;
2140         __u16 maxmodrpcs;
2141         __u16 prev;
2142
2143         if (max > OBD_MAX_RIF_MAX || max < 1)
2144                 return -ERANGE;
2145
2146         ocd = &cli->cl_import->imp_connect_data;
2147         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2148                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2149                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2150
2151         if (max == OBD_MAX_RIF_MAX)
2152                 max = OBD_MAX_RIF_MAX - 1;
2153
2154         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2155          * increase this value, also bump up max_rpcs_in_flight to match.
2156          */
2157         if (max >= cli->cl_max_rpcs_in_flight) {
2158                 CDEBUG(D_INFO,
2159                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2160                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2161                 obd_set_max_rpcs_in_flight(cli, max + 1);
2162         }
2163
2164         /* cannot exceed max modify RPCs in flight supported by the server,
2165          * but verify ocd_connect_flags is at least initialized first.  If
2166          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2167          */
2168         if (!ocd->ocd_connect_flags) {
2169                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2170         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2171                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2172                 if (maxmodrpcs == 0) { /* connection not finished yet */
2173                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2174                         CDEBUG(D_INFO,
2175                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2176                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2177                 }
2178         } else {
2179                 maxmodrpcs = 1;
2180         }
2181         if (max > maxmodrpcs) {
2182                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2183                        cli->cl_import->imp_obd->obd_name,
2184                        max, maxmodrpcs);
2185                 return -ERANGE;
2186         }
2187
2188         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2189
2190         prev = cli->cl_max_mod_rpcs_in_flight;
2191         cli->cl_max_mod_rpcs_in_flight = max;
2192
2193         /* wakeup waiters if limit has been increased */
2194         if (cli->cl_max_mod_rpcs_in_flight > prev)
2195                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2196
2197         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2198
2199         return 0;
2200 }
2201 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2202
2203 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2204                                struct seq_file *seq)
2205 {
2206         unsigned long mod_tot = 0, mod_cum;
2207         int i;
2208
2209         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2210         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2211                              ":", true, "");
2212         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2213                    cli->cl_mod_rpcs_in_flight);
2214
2215         seq_printf(seq, "\n\t\t\tmodify\n");
2216         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2217
2218         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2219
2220         mod_cum = 0;
2221         for (i = 0; i < OBD_HIST_MAX; i++) {
2222                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2223
2224                 mod_cum += mod;
2225                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2226                            i, mod, pct(mod, mod_tot),
2227                            pct(mod_cum, mod_tot));
2228                 if (mod_cum == mod_tot)
2229                         break;
2230         }
2231
2232         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2233
2234         return 0;
2235 }
2236 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2237
2238 /* The number of modify RPCs sent in parallel is limited
2239  * because the server has a finite number of slots per client to
2240  * store request result and ensure reply reconstruction when needed.
2241  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2242  * that takes into account server limit and cl_max_rpcs_in_flight
2243  * value.
2244  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2245  * one close request is allowed above the maximum.
2246  */
2247 struct mod_waiter {
2248         struct client_obd *cli;
2249         bool close_req;
2250         bool woken;
2251         wait_queue_entry_t wqe;
2252 };
2253 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2254                                   unsigned int mode, int flags, void *key)
2255 {
2256         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2257         struct client_obd *cli = w->cli;
2258         bool close_req = w->close_req;
2259         bool avail;
2260         int ret;
2261
2262         /* As woken_wake_function() doesn't remove us from the wait_queue,
2263          * we use own flag to ensure we're called just once.
2264          */
2265         if (w->woken)
2266                 return 0;
2267
2268         /* A slot is available if
2269          * - number of modify RPCs in flight is less than the max
2270          * - it's a close RPC and no other close request is in flight
2271          */
2272         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2273                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2274         if (avail) {
2275                 cli->cl_mod_rpcs_in_flight++;
2276                 if (w->close_req)
2277                         cli->cl_close_rpcs_in_flight++;
2278                 ret = woken_wake_function(wq_entry, mode, flags, key);
2279                 w->woken = true;
2280         } else if (cli->cl_close_rpcs_in_flight)
2281                 /* No other waiter could be woken */
2282                 ret = -1;
2283         else if (key == NULL)
2284                 /* This was not a wakeup from a close completion, so there is no
2285                  * point seeing if there are close waiters to be woken
2286                  */
2287                 ret = -1;
2288         else
2289                 /* There might be be a close we could wake, keep looking */
2290                 ret = 0;
2291         return ret;
2292 }
2293
2294 /* Get a modify RPC slot from the obd client @cli according
2295  * to the kind of operation @opc that is going to be sent
2296  * and the intent @it of the operation if it applies.
2297  * If the maximum number of modify RPCs in flight is reached
2298  * the thread is put to sleep.
2299  * Returns the tag to be set in the request message. Tag 0
2300  * is reserved for non-modifying requests.
2301  */
2302 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2303 {
2304         struct mod_waiter wait = {
2305                 .cli = cli,
2306                 .close_req = (opc == MDS_CLOSE),
2307                 .woken = false,
2308         };
2309         __u16                   i, max;
2310
2311         init_wait(&wait.wqe);
2312         wait.wqe.func = claim_mod_rpc_function;
2313
2314         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2315         __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2316         /* This wakeup will only succeed if the maximums haven't
2317          * been reached.  If that happens, WQ_FLAG_WOKEN will be cleared
2318          * and there will be no need to wait.
2319          */
2320         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2321         /* XXX: handle spurious wakeups (from unknown yet source */
2322         while (wait.woken == false) {
2323                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2324                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2325                            MAX_SCHEDULE_TIMEOUT);
2326                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2327         }
2328         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2329
2330         max = cli->cl_max_mod_rpcs_in_flight;
2331         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2332                          cli->cl_mod_rpcs_in_flight);
2333         /* find a free tag */
2334         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2335                                 max + 1);
2336         LASSERT(i < OBD_MAX_RIF_MAX);
2337         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2338         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2339         /* tag 0 is reserved for non-modify RPCs */
2340
2341         CDEBUG(D_RPCTRACE,
2342                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2343                cli->cl_import->imp_obd->obd_name,
2344                i + 1, opc, max);
2345
2346         return i + 1;
2347 }
2348 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2349
2350 /* Put a modify RPC slot from the obd client @cli according
2351  * to the kind of operation @opc that has been sent.
2352  */
2353 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2354 {
2355         bool                    close_req = false;
2356
2357         if (tag == 0)
2358                 return;
2359
2360         if (opc == MDS_CLOSE)
2361                 close_req = true;
2362
2363         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2364         cli->cl_mod_rpcs_in_flight--;
2365         if (close_req)
2366                 cli->cl_close_rpcs_in_flight--;
2367         /* release the tag in the bitmap */
2368         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2369         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2370         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2371                              (void *)close_req);
2372         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2373 }
2374 EXPORT_SYMBOL(obd_put_mod_rpc_slot);