Whamcloud - gitweb
LU-14161 obdclass: fix some problems with obd_nid_hash
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct kobject *kobj = kset_find_obj(lustre_kset, name);
97
98         if (kobj && kobj->ktype == &class_ktype)
99                 return container_of(kobj, struct obd_type, typ_kobj);
100
101         kobject_put(kobj);
102         return NULL;
103 }
104 EXPORT_SYMBOL(class_search_type);
105
106 struct obd_type *class_get_type(const char *name)
107 {
108         struct obd_type *type;
109
110         type = class_search_type(name);
111 #ifdef HAVE_MODULE_LOADING_SUPPORT
112         if (!type) {
113                 const char *modname = name;
114
115 #ifdef HAVE_SERVER_SUPPORT
116                 if (strcmp(modname, "obdfilter") == 0)
117                         modname = "ofd";
118
119                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
120                         modname = LUSTRE_OSP_NAME;
121
122                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
123                         modname = LUSTRE_MDT_NAME;
124 #endif /* HAVE_SERVER_SUPPORT */
125
126                 if (!request_module("%s", modname)) {
127                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128                         type = class_search_type(name);
129                 } else {
130                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
131                                            modname);
132                 }
133         }
134 #endif
135         if (type) {
136                 if (try_module_get(type->typ_dt_ops->o_owner)) {
137                         atomic_inc(&type->typ_refcnt);
138                         /* class_search_type() returned a counted reference,
139                          * but we don't need that count any more as
140                          * we have one through typ_refcnt.
141                          */
142                         kobject_put(&type->typ_kobj);
143                 } else {
144                         kobject_put(&type->typ_kobj);
145                         type = NULL;
146                 }
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157
158 static void class_sysfs_release(struct kobject *kobj)
159 {
160         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161
162         debugfs_remove_recursive(type->typ_debugfs_entry);
163         type->typ_debugfs_entry = NULL;
164
165         if (type->typ_lu)
166                 lu_device_type_fini(type->typ_lu);
167
168 #ifdef CONFIG_PROC_FS
169         if (type->typ_name && type->typ_procroot)
170                 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 #endif
172         OBD_FREE(type, sizeof(*type));
173 }
174
175 static struct kobj_type class_ktype = {
176         .sysfs_ops      = &lustre_sysfs_ops,
177         .release        = class_sysfs_release,
178 };
179
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 {
183         struct dentry *symlink;
184         struct obd_type *type;
185         int rc;
186
187         type = class_search_type(name);
188         if (type) {
189                 kobject_put(&type->typ_kobj);
190                 return ERR_PTR(-EEXIST);
191         }
192
193         OBD_ALLOC(type, sizeof(*type));
194         if (!type)
195                 return ERR_PTR(-ENOMEM);
196
197         type->typ_kobj.kset = lustre_kset;
198         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199                                   &lustre_kset->kobj, "%s", name);
200         if (rc)
201                 return ERR_PTR(rc);
202
203         symlink = debugfs_create_dir(name, debugfs_lustre_root);
204         type->typ_debugfs_entry = symlink;
205         type->typ_sym_filter = true;
206
207         if (enable_proc) {
208                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209                                                       NULL, NULL);
210                 if (IS_ERR(type->typ_procroot)) {
211                         CERROR("%s: can't create compat proc entry: %d\n",
212                                name, (int)PTR_ERR(type->typ_procroot));
213                         type->typ_procroot = NULL;
214                 }
215         }
216
217         return type;
218 }
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
221
222 #define CLASS_MAX_NAME 1024
223
224 int class_register_type(const struct obd_ops *dt_ops,
225                         const struct md_ops *md_ops,
226                         bool enable_proc, struct ldebugfs_vars *vars,
227                         const char *name, struct lu_device_type *ldt)
228 {
229         struct obd_type *type;
230         int rc;
231
232         ENTRY;
233         /* sanity check */
234         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235
236         type = class_search_type(name);
237         if (type) {
238 #ifdef HAVE_SERVER_SUPPORT
239                 if (type->typ_sym_filter)
240                         goto dir_exist;
241 #endif /* HAVE_SERVER_SUPPORT */
242                 kobject_put(&type->typ_kobj);
243                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
244                 RETURN(-EEXIST);
245         }
246
247         OBD_ALLOC(type, sizeof(*type));
248         if (type == NULL)
249                 RETURN(-ENOMEM);
250
251         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252         type->typ_kobj.kset = lustre_kset;
253         kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
255 dir_exist:
256 #endif /* HAVE_SERVER_SUPPORT */
257
258         type->typ_dt_ops = dt_ops;
259         type->typ_md_ops = md_ops;
260
261 #ifdef HAVE_SERVER_SUPPORT
262         if (type->typ_sym_filter) {
263                 type->typ_sym_filter = false;
264                 kobject_put(&type->typ_kobj);
265                 goto setup_ldt;
266         }
267 #endif
268 #ifdef CONFIG_PROC_FS
269         if (enable_proc && !type->typ_procroot) {
270                 type->typ_procroot = lprocfs_register(name,
271                                                       proc_lustre_root,
272                                                       NULL, type);
273                 if (IS_ERR(type->typ_procroot)) {
274                         rc = PTR_ERR(type->typ_procroot);
275                         type->typ_procroot = NULL;
276                         GOTO(failed, rc);
277                 }
278         }
279 #endif
280         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
383         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
384         INIT_LIST_HEAD(&newdev->obd_exports_timed);
385         INIT_LIST_HEAD(&newdev->obd_nid_stats);
386         spin_lock_init(&newdev->obd_nid_lock);
387         spin_lock_init(&newdev->obd_dev_lock);
388         mutex_init(&newdev->obd_dev_mutex);
389         spin_lock_init(&newdev->obd_osfs_lock);
390         /* newdev->obd_osfs_age must be set to a value in the distant
391          * past to guarantee a fresh statfs is fetched on mount. */
392         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
393
394         /* XXX belongs in setup not attach  */
395         init_rwsem(&newdev->obd_observer_link_sem);
396         /* recovery data */
397         spin_lock_init(&newdev->obd_recovery_task_lock);
398         init_waitqueue_head(&newdev->obd_next_transno_waitq);
399         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
400         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403         INIT_LIST_HEAD(&newdev->obd_evict_list);
404         INIT_LIST_HEAD(&newdev->obd_lwp_list);
405
406         llog_group_init(&newdev->obd_olg);
407         /* Detach drops this */
408         atomic_set(&newdev->obd_refcount, 1);
409         lu_ref_init(&newdev->obd_reference);
410         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
411
412         newdev->obd_conn_inprogress = 0;
413
414         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
415
416         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417                newdev->obd_name, newdev);
418
419         return newdev;
420 }
421
422 /**
423  * Free obd device.
424  *
425  * \param[in] obd obd_device to be freed
426  *
427  * \retval none
428  */
429 void class_free_dev(struct obd_device *obd)
430 {
431         struct obd_type *obd_type = obd->obd_type;
432
433         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
436                  "obd %p != obd_devs[%d] %p\n",
437                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
438         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
439                  "obd_refcount should be 0, not %d\n",
440                  atomic_read(&obd->obd_refcount));
441         LASSERT(obd_type != NULL);
442
443         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444                obd->obd_name, obd->obd_type->typ_name);
445
446         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447                          obd->obd_name, obd->obd_uuid.uuid);
448         if (obd->obd_stopping) {
449                 int err;
450
451                 /* If we're not stopping, we were never set up */
452                 err = obd_cleanup(obd);
453                 if (err)
454                         CERROR("Cleanup %s returned %d\n",
455                                 obd->obd_name, err);
456         }
457
458         obd_device_free(obd);
459
460         class_put_type(obd_type);
461 }
462
463 /**
464  * Unregister obd device.
465  *
466  * Free slot in obd_dev[] used by \a obd.
467  *
468  * \param[in] new_obd obd_device to be unregistered
469  *
470  * \retval none
471  */
472 void class_unregister_device(struct obd_device *obd)
473 {
474         write_lock(&obd_dev_lock);
475         if (obd->obd_minor >= 0) {
476                 LASSERT(obd_devs[obd->obd_minor] == obd);
477                 obd_devs[obd->obd_minor] = NULL;
478                 obd->obd_minor = -1;
479         }
480         write_unlock(&obd_dev_lock);
481 }
482
483 /**
484  * Register obd device.
485  *
486  * Find free slot in obd_devs[], fills it with \a new_obd.
487  *
488  * \param[in] new_obd obd_device to be registered
489  *
490  * \retval 0          success
491  * \retval -EEXIST    device with this name is registered
492  * \retval -EOVERFLOW obd_devs[] is full
493  */
494 int class_register_device(struct obd_device *new_obd)
495 {
496         int ret = 0;
497         int i;
498         int new_obd_minor = 0;
499         bool minor_assign = false;
500         bool retried = false;
501
502 again:
503         write_lock(&obd_dev_lock);
504         for (i = 0; i < class_devno_max(); i++) {
505                 struct obd_device *obd = class_num2obd(i);
506
507                 if (obd != NULL &&
508                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
509
510                         if (!retried) {
511                                 write_unlock(&obd_dev_lock);
512
513                                 /* the obd_device could be waited to be
514                                  * destroyed by the "obd_zombie_impexp_thread".
515                                  */
516                                 obd_zombie_barrier();
517                                 retried = true;
518                                 goto again;
519                         }
520
521                         CERROR("%s: already exists, won't add\n",
522                                obd->obd_name);
523                         /* in case we found a free slot before duplicate */
524                         minor_assign = false;
525                         ret = -EEXIST;
526                         break;
527                 }
528                 if (!minor_assign && obd == NULL) {
529                         new_obd_minor = i;
530                         minor_assign = true;
531                 }
532         }
533
534         if (minor_assign) {
535                 new_obd->obd_minor = new_obd_minor;
536                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
537                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
538                 obd_devs[new_obd_minor] = new_obd;
539         } else {
540                 if (ret == 0) {
541                         ret = -EOVERFLOW;
542                         CERROR("%s: all %u/%u devices used, increase "
543                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
544                                i, class_devno_max(), ret);
545                 }
546         }
547         write_unlock(&obd_dev_lock);
548
549         RETURN(ret);
550 }
551
552 static int class_name2dev_nolock(const char *name)
553 {
554         int i;
555
556         if (!name)
557                 return -1;
558
559         for (i = 0; i < class_devno_max(); i++) {
560                 struct obd_device *obd = class_num2obd(i);
561
562                 if (obd && strcmp(name, obd->obd_name) == 0) {
563                         /* Make sure we finished attaching before we give
564                            out any references */
565                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
566                         if (obd->obd_attached) {
567                                 return i;
568                         }
569                         break;
570                 }
571         }
572
573         return -1;
574 }
575
576 int class_name2dev(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         read_lock(&obd_dev_lock);
584         i = class_name2dev_nolock(name);
585         read_unlock(&obd_dev_lock);
586
587         return i;
588 }
589 EXPORT_SYMBOL(class_name2dev);
590
591 struct obd_device *class_name2obd(const char *name)
592 {
593         int dev = class_name2dev(name);
594
595         if (dev < 0 || dev > class_devno_max())
596                 return NULL;
597         return class_num2obd(dev);
598 }
599 EXPORT_SYMBOL(class_name2obd);
600
601 int class_uuid2dev_nolock(struct obd_uuid *uuid)
602 {
603         int i;
604
605         for (i = 0; i < class_devno_max(); i++) {
606                 struct obd_device *obd = class_num2obd(i);
607
608                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
609                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
610                         return i;
611                 }
612         }
613
614         return -1;
615 }
616
617 int class_uuid2dev(struct obd_uuid *uuid)
618 {
619         int i;
620
621         read_lock(&obd_dev_lock);
622         i = class_uuid2dev_nolock(uuid);
623         read_unlock(&obd_dev_lock);
624
625         return i;
626 }
627 EXPORT_SYMBOL(class_uuid2dev);
628
629 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
630 {
631         int dev = class_uuid2dev(uuid);
632         if (dev < 0)
633                 return NULL;
634         return class_num2obd(dev);
635 }
636 EXPORT_SYMBOL(class_uuid2obd);
637
638 /**
639  * Get obd device from ::obd_devs[]
640  *
641  * \param num [in] array index
642  *
643  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
644  *         otherwise return the obd device there.
645  */
646 struct obd_device *class_num2obd(int num)
647 {
648         struct obd_device *obd = NULL;
649
650         if (num < class_devno_max()) {
651                 obd = obd_devs[num];
652                 if (obd == NULL)
653                         return NULL;
654
655                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
656                          "%p obd_magic %08x != %08x\n",
657                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
658                 LASSERTF(obd->obd_minor == num,
659                          "%p obd_minor %0d != %0d\n",
660                          obd, obd->obd_minor, num);
661         }
662
663         return obd;
664 }
665 EXPORT_SYMBOL(class_num2obd);
666
667 /**
668  * Find obd in obd_dev[] by name or uuid.
669  *
670  * Increment obd's refcount if found.
671  *
672  * \param[in] str obd name or uuid
673  *
674  * \retval NULL    if not found
675  * \retval target  pointer to found obd_device
676  */
677 struct obd_device *class_dev_by_str(const char *str)
678 {
679         struct obd_device *target = NULL;
680         struct obd_uuid tgtuuid;
681         int rc;
682
683         obd_str2uuid(&tgtuuid, str);
684
685         read_lock(&obd_dev_lock);
686         rc = class_uuid2dev_nolock(&tgtuuid);
687         if (rc < 0)
688                 rc = class_name2dev_nolock(str);
689
690         if (rc >= 0)
691                 target = class_num2obd(rc);
692
693         if (target != NULL)
694                 class_incref(target, "find", current);
695         read_unlock(&obd_dev_lock);
696
697         RETURN(target);
698 }
699 EXPORT_SYMBOL(class_dev_by_str);
700
701 /**
702  * Get obd devices count. Device in any
703  *    state are counted
704  * \retval obd device count
705  */
706 int get_devices_count(void)
707 {
708         int index, max_index = class_devno_max(), dev_count = 0;
709
710         read_lock(&obd_dev_lock);
711         for (index = 0; index <= max_index; index++) {
712                 struct obd_device *obd = class_num2obd(index);
713                 if (obd != NULL)
714                         dev_count++;
715         }
716         read_unlock(&obd_dev_lock);
717
718         return dev_count;
719 }
720 EXPORT_SYMBOL(get_devices_count);
721
722 void class_obd_list(void)
723 {
724         char *status;
725         int i;
726
727         read_lock(&obd_dev_lock);
728         for (i = 0; i < class_devno_max(); i++) {
729                 struct obd_device *obd = class_num2obd(i);
730
731                 if (obd == NULL)
732                         continue;
733                 if (obd->obd_stopping)
734                         status = "ST";
735                 else if (obd->obd_set_up)
736                         status = "UP";
737                 else if (obd->obd_attached)
738                         status = "AT";
739                 else
740                         status = "--";
741                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742                          i, status, obd->obd_type->typ_name,
743                          obd->obd_name, obd->obd_uuid.uuid,
744                          atomic_read(&obd->obd_refcount));
745         }
746         read_unlock(&obd_dev_lock);
747 }
748
749 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
750  * specified, then only the client with that uuid is returned,
751  * otherwise any client connected to the tgt is returned.
752  */
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754                                          const char *type_name,
755                                          struct obd_uuid *grp_uuid)
756 {
757         int i;
758
759         read_lock(&obd_dev_lock);
760         for (i = 0; i < class_devno_max(); i++) {
761                 struct obd_device *obd = class_num2obd(i);
762
763                 if (obd == NULL)
764                         continue;
765                 if ((strncmp(obd->obd_type->typ_name, type_name,
766                              strlen(type_name)) == 0)) {
767                         if (obd_uuid_equals(tgt_uuid,
768                                             &obd->u.cli.cl_target_uuid) &&
769                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
770                                                          &obd->obd_uuid) : 1)) {
771                                 read_unlock(&obd_dev_lock);
772                                 return obd;
773                         }
774                 }
775         }
776         read_unlock(&obd_dev_lock);
777
778         return NULL;
779 }
780 EXPORT_SYMBOL(class_find_client_obd);
781
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783  * searching at *next, and if a device is found, the next index to look
784  * at is saved in *next. If next is NULL, then the first matching device
785  * will always be returned.
786  */
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
788 {
789         int i;
790
791         if (next == NULL)
792                 i = 0;
793         else if (*next >= 0 && *next < class_devno_max())
794                 i = *next;
795         else
796                 return NULL;
797
798         read_lock(&obd_dev_lock);
799         for (; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805                         if (next != NULL)
806                                 *next = i+1;
807                         read_unlock(&obd_dev_lock);
808                         return obd;
809                 }
810         }
811         read_unlock(&obd_dev_lock);
812
813         return NULL;
814 }
815 EXPORT_SYMBOL(class_devices_in_group);
816
817 /**
818  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819  * adjust sptlrpc settings accordingly.
820  */
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
822 {
823         struct obd_device  *obd;
824         const char         *type;
825         int                 i, rc = 0, rc2;
826
827         LASSERT(namelen > 0);
828
829         read_lock(&obd_dev_lock);
830         for (i = 0; i < class_devno_max(); i++) {
831                 obd = class_num2obd(i);
832
833                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834                         continue;
835
836                 /* only notify mdc, osc, osp, lwp, mdt, ost
837                  * because only these have a -sptlrpc llog */
838                 type = obd->obd_type->typ_name;
839                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OST_NAME) != 0)
845                         continue;
846
847                 if (strncmp(obd->obd_name, fsname, namelen))
848                         continue;
849
850                 class_incref(obd, __FUNCTION__, obd);
851                 read_unlock(&obd_dev_lock);
852                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853                                          sizeof(KEY_SPTLRPC_CONF),
854                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
855                 rc = rc ? rc : rc2;
856                 class_decref(obd, __FUNCTION__, obd);
857                 read_lock(&obd_dev_lock);
858         }
859         read_unlock(&obd_dev_lock);
860         return rc;
861 }
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
863
864 void obd_cleanup_caches(void)
865 {
866         ENTRY;
867         if (obd_device_cachep) {
868                 kmem_cache_destroy(obd_device_cachep);
869                 obd_device_cachep = NULL;
870         }
871
872         EXIT;
873 }
874
875 int obd_init_caches(void)
876 {
877         int rc;
878         ENTRY;
879
880         LASSERT(obd_device_cachep == NULL);
881         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882                                 sizeof(struct obd_device),
883                                 0, 0, 0, sizeof(struct obd_device), NULL);
884         if (!obd_device_cachep)
885                 GOTO(out, rc = -ENOMEM);
886
887         RETURN(0);
888 out:
889         obd_cleanup_caches();
890         RETURN(rc);
891 }
892
893 static const char export_handle_owner[] = "export";
894
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
897 {
898         struct obd_export *export;
899         ENTRY;
900
901         if (!conn) {
902                 CDEBUG(D_CACHE, "looking for null handle\n");
903                 RETURN(NULL);
904         }
905
906         if (conn->cookie == -1) {  /* this means assign a new connection */
907                 CDEBUG(D_CACHE, "want a new connection\n");
908                 RETURN(NULL);
909         }
910
911         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912         export = class_handle2object(conn->cookie, export_handle_owner);
913         RETURN(export);
914 }
915 EXPORT_SYMBOL(class_conn2export);
916
917 struct obd_device *class_exp2obd(struct obd_export *exp)
918 {
919         if (exp)
920                 return exp->exp_obd;
921         return NULL;
922 }
923 EXPORT_SYMBOL(class_exp2obd);
924
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
926 {
927         struct obd_device *obd = exp->exp_obd;
928         if (obd == NULL)
929                 return NULL;
930         return obd->u.cli.cl_import;
931 }
932 EXPORT_SYMBOL(class_exp2cliimp);
933
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
936 {
937         struct obd_device *obd = exp->exp_obd;
938         ENTRY;
939
940         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941         LASSERT(obd != NULL);
942
943         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944                exp->exp_client_uuid.uuid, obd->obd_name);
945
946         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947         ptlrpc_connection_put(exp->exp_connection);
948
949         LASSERT(list_empty(&exp->exp_outstanding_replies));
950         LASSERT(list_empty(&exp->exp_uncommitted_replies));
951         LASSERT(list_empty(&exp->exp_req_replay_queue));
952         LASSERT(list_empty(&exp->exp_hp_rpcs));
953         obd_destroy_export(exp);
954         /* self export doesn't hold a reference to an obd, although it
955          * exists until freeing of the obd */
956         if (exp != obd->obd_self_export)
957                 class_decref(obd, "export", exp);
958
959         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
960         kfree_rcu(exp, exp_handle.h_rcu);
961         EXIT;
962 }
963
964 struct obd_export *class_export_get(struct obd_export *exp)
965 {
966         refcount_inc(&exp->exp_handle.h_ref);
967         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
968                refcount_read(&exp->exp_handle.h_ref));
969         return exp;
970 }
971 EXPORT_SYMBOL(class_export_get);
972
973 void class_export_put(struct obd_export *exp)
974 {
975         LASSERT(exp != NULL);
976         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
977         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
978         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
979                refcount_read(&exp->exp_handle.h_ref) - 1);
980
981         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
982                 struct obd_device *obd = exp->exp_obd;
983
984                 CDEBUG(D_IOCTL, "final put %p/%s\n",
985                        exp, exp->exp_client_uuid.uuid);
986
987                 /* release nid stat refererence */
988                 lprocfs_exp_cleanup(exp);
989
990                 if (exp == obd->obd_self_export) {
991                         /* self export should be destroyed without
992                          * zombie thread as it doesn't hold a
993                          * reference to obd and doesn't hold any
994                          * resources */
995                         class_export_destroy(exp);
996                         /* self export is destroyed, no class
997                          * references exist and it is safe to free
998                          * obd */
999                         class_free_dev(obd);
1000                 } else {
1001                         LASSERT(!list_empty(&exp->exp_obd_chain));
1002                         obd_zombie_export_add(exp);
1003                 }
1004
1005         }
1006 }
1007 EXPORT_SYMBOL(class_export_put);
1008
1009 static void obd_zombie_exp_cull(struct work_struct *ws)
1010 {
1011         struct obd_export *export;
1012
1013         export = container_of(ws, struct obd_export, exp_zombie_work);
1014         class_export_destroy(export);
1015 }
1016
1017 /* Creates a new export, adds it to the hash table, and returns a
1018  * pointer to it. The refcount is 2: one for the hash reference, and
1019  * one for the pointer returned by this function. */
1020 struct obd_export *__class_new_export(struct obd_device *obd,
1021                                       struct obd_uuid *cluuid, bool is_self)
1022 {
1023         struct obd_export *export;
1024         int rc = 0;
1025         ENTRY;
1026
1027         OBD_ALLOC_PTR(export);
1028         if (!export)
1029                 return ERR_PTR(-ENOMEM);
1030
1031         export->exp_conn_cnt = 0;
1032         export->exp_lock_hash = NULL;
1033         export->exp_flock_hash = NULL;
1034         /* 2 = class_handle_hash + last */
1035         refcount_set(&export->exp_handle.h_ref, 2);
1036         atomic_set(&export->exp_rpc_count, 0);
1037         atomic_set(&export->exp_cb_count, 0);
1038         atomic_set(&export->exp_locks_count, 0);
1039 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1040         INIT_LIST_HEAD(&export->exp_locks_list);
1041         spin_lock_init(&export->exp_locks_list_guard);
1042 #endif
1043         atomic_set(&export->exp_replay_count, 0);
1044         export->exp_obd = obd;
1045         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1046         spin_lock_init(&export->exp_uncommitted_replies_lock);
1047         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1048         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1049         INIT_HLIST_NODE(&export->exp_handle.h_link);
1050         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1051         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1052         class_handle_hash(&export->exp_handle, export_handle_owner);
1053         export->exp_last_request_time = ktime_get_real_seconds();
1054         spin_lock_init(&export->exp_lock);
1055         spin_lock_init(&export->exp_rpc_lock);
1056         INIT_HLIST_NODE(&export->exp_gen_hash);
1057         spin_lock_init(&export->exp_bl_list_lock);
1058         INIT_LIST_HEAD(&export->exp_bl_list);
1059         INIT_LIST_HEAD(&export->exp_stale_list);
1060         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1061
1062         export->exp_sp_peer = LUSTRE_SP_ANY;
1063         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1064         export->exp_client_uuid = *cluuid;
1065         obd_init_export(export);
1066
1067         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1068
1069         spin_lock(&obd->obd_dev_lock);
1070         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1071                 /* shouldn't happen, but might race */
1072                 if (obd->obd_stopping)
1073                         GOTO(exit_unlock, rc = -ENODEV);
1074
1075                 rc = obd_uuid_add(obd, export);
1076                 if (rc != 0) {
1077                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1078                                       obd->obd_name, cluuid->uuid, rc);
1079                         GOTO(exit_unlock, rc = -EALREADY);
1080                 }
1081         }
1082
1083         if (!is_self) {
1084                 class_incref(obd, "export", export);
1085                 list_add_tail(&export->exp_obd_chain_timed,
1086                               &obd->obd_exports_timed);
1087                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1088                 obd->obd_num_exports++;
1089         } else {
1090                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1091                 INIT_LIST_HEAD(&export->exp_obd_chain);
1092         }
1093         spin_unlock(&obd->obd_dev_lock);
1094         RETURN(export);
1095
1096 exit_unlock:
1097         spin_unlock(&obd->obd_dev_lock);
1098         class_handle_unhash(&export->exp_handle);
1099         obd_destroy_export(export);
1100         OBD_FREE_PTR(export);
1101         return ERR_PTR(rc);
1102 }
1103
1104 struct obd_export *class_new_export(struct obd_device *obd,
1105                                     struct obd_uuid *uuid)
1106 {
1107         return __class_new_export(obd, uuid, false);
1108 }
1109 EXPORT_SYMBOL(class_new_export);
1110
1111 struct obd_export *class_new_export_self(struct obd_device *obd,
1112                                          struct obd_uuid *uuid)
1113 {
1114         return __class_new_export(obd, uuid, true);
1115 }
1116
1117 void class_unlink_export(struct obd_export *exp)
1118 {
1119         class_handle_unhash(&exp->exp_handle);
1120
1121         if (exp->exp_obd->obd_self_export == exp) {
1122                 class_export_put(exp);
1123                 return;
1124         }
1125
1126         spin_lock(&exp->exp_obd->obd_dev_lock);
1127         /* delete an uuid-export hashitem from hashtables */
1128         if (exp != exp->exp_obd->obd_self_export)
1129                 obd_uuid_del(exp->exp_obd, exp);
1130
1131 #ifdef HAVE_SERVER_SUPPORT
1132         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1133                 struct tg_export_data   *ted = &exp->exp_target_data;
1134                 struct cfs_hash         *hash;
1135
1136                 /* Because obd_gen_hash will not be released until
1137                  * class_cleanup(), so hash should never be NULL here */
1138                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1139                 LASSERT(hash != NULL);
1140                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1141                              &exp->exp_gen_hash);
1142                 cfs_hash_putref(hash);
1143         }
1144 #endif /* HAVE_SERVER_SUPPORT */
1145
1146         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1147         list_del_init(&exp->exp_obd_chain_timed);
1148         exp->exp_obd->obd_num_exports--;
1149         spin_unlock(&exp->exp_obd->obd_dev_lock);
1150         atomic_inc(&obd_stale_export_num);
1151
1152         /* A reference is kept by obd_stale_exports list */
1153         obd_stale_export_put(exp);
1154 }
1155 EXPORT_SYMBOL(class_unlink_export);
1156
1157 /* Import management functions */
1158 static void obd_zombie_import_free(struct obd_import *imp)
1159 {
1160         ENTRY;
1161
1162         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1163                imp->imp_obd->obd_name);
1164
1165         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1166
1167         ptlrpc_connection_put(imp->imp_connection);
1168
1169         while (!list_empty(&imp->imp_conn_list)) {
1170                 struct obd_import_conn *imp_conn;
1171
1172                 imp_conn = list_first_entry(&imp->imp_conn_list,
1173                                             struct obd_import_conn, oic_item);
1174                 list_del_init(&imp_conn->oic_item);
1175                 ptlrpc_connection_put(imp_conn->oic_conn);
1176                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1177         }
1178
1179         LASSERT(imp->imp_sec == NULL);
1180         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1181                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1182         class_decref(imp->imp_obd, "import", imp);
1183         OBD_FREE_PTR(imp);
1184         EXIT;
1185 }
1186
1187 struct obd_import *class_import_get(struct obd_import *import)
1188 {
1189         refcount_inc(&import->imp_refcount);
1190         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1191                refcount_read(&import->imp_refcount),
1192                import->imp_obd->obd_name);
1193         return import;
1194 }
1195 EXPORT_SYMBOL(class_import_get);
1196
1197 void class_import_put(struct obd_import *imp)
1198 {
1199         ENTRY;
1200
1201         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1202
1203         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1204                refcount_read(&imp->imp_refcount) - 1,
1205                imp->imp_obd->obd_name);
1206
1207         if (refcount_dec_and_test(&imp->imp_refcount)) {
1208                 CDEBUG(D_INFO, "final put import %p\n", imp);
1209                 obd_zombie_import_add(imp);
1210         }
1211
1212         EXIT;
1213 }
1214 EXPORT_SYMBOL(class_import_put);
1215
1216 static void init_imp_at(struct imp_at *at) {
1217         int i;
1218         at_init(&at->iat_net_latency, 0, 0);
1219         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1220                 /* max service estimates are tracked on the server side, so
1221                    don't use the AT history here, just use the last reported
1222                    val. (But keep hist for proc histogram, worst_ever) */
1223                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1224                         AT_FLG_NOHIST);
1225         }
1226 }
1227
1228 static void obd_zombie_imp_cull(struct work_struct *ws)
1229 {
1230         struct obd_import *import;
1231
1232         import = container_of(ws, struct obd_import, imp_zombie_work);
1233         obd_zombie_import_free(import);
1234 }
1235
1236 struct obd_import *class_new_import(struct obd_device *obd)
1237 {
1238         struct obd_import *imp;
1239         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1240
1241         OBD_ALLOC(imp, sizeof(*imp));
1242         if (imp == NULL)
1243                 return NULL;
1244
1245         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1246         INIT_LIST_HEAD(&imp->imp_replay_list);
1247         INIT_LIST_HEAD(&imp->imp_sending_list);
1248         INIT_LIST_HEAD(&imp->imp_delayed_list);
1249         INIT_LIST_HEAD(&imp->imp_committed_list);
1250         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1251         imp->imp_known_replied_xid = 0;
1252         imp->imp_replay_cursor = &imp->imp_committed_list;
1253         spin_lock_init(&imp->imp_lock);
1254         imp->imp_last_success_conn = 0;
1255         imp->imp_state = LUSTRE_IMP_NEW;
1256         imp->imp_obd = class_incref(obd, "import", imp);
1257         rwlock_init(&imp->imp_sec_lock);
1258         init_waitqueue_head(&imp->imp_recovery_waitq);
1259         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1260
1261         if (curr_pid_ns && curr_pid_ns->child_reaper)
1262                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1263         else
1264                 imp->imp_sec_refpid = 1;
1265
1266         refcount_set(&imp->imp_refcount, 2);
1267         atomic_set(&imp->imp_unregistering, 0);
1268         atomic_set(&imp->imp_reqs, 0);
1269         atomic_set(&imp->imp_inflight, 0);
1270         atomic_set(&imp->imp_replay_inflight, 0);
1271         init_waitqueue_head(&imp->imp_replay_waitq);
1272         atomic_set(&imp->imp_inval_count, 0);
1273         INIT_LIST_HEAD(&imp->imp_conn_list);
1274         init_imp_at(&imp->imp_at);
1275
1276         /* the default magic is V2, will be used in connect RPC, and
1277          * then adjusted according to the flags in request/reply. */
1278         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1279
1280         return imp;
1281 }
1282 EXPORT_SYMBOL(class_new_import);
1283
1284 void class_destroy_import(struct obd_import *import)
1285 {
1286         LASSERT(import != NULL);
1287         LASSERT(import != LP_POISON);
1288
1289         spin_lock(&import->imp_lock);
1290         import->imp_generation++;
1291         spin_unlock(&import->imp_lock);
1292         class_import_put(import);
1293 }
1294 EXPORT_SYMBOL(class_destroy_import);
1295
1296 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1297
1298 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1299 {
1300         spin_lock(&exp->exp_locks_list_guard);
1301
1302         LASSERT(lock->l_exp_refs_nr >= 0);
1303
1304         if (lock->l_exp_refs_target != NULL &&
1305             lock->l_exp_refs_target != exp) {
1306                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1307                               exp, lock, lock->l_exp_refs_target);
1308         }
1309         if ((lock->l_exp_refs_nr ++) == 0) {
1310                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1311                 lock->l_exp_refs_target = exp;
1312         }
1313         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1314                lock, exp, lock->l_exp_refs_nr);
1315         spin_unlock(&exp->exp_locks_list_guard);
1316 }
1317 EXPORT_SYMBOL(__class_export_add_lock_ref);
1318
1319 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1320 {
1321         spin_lock(&exp->exp_locks_list_guard);
1322         LASSERT(lock->l_exp_refs_nr > 0);
1323         if (lock->l_exp_refs_target != exp) {
1324                 LCONSOLE_WARN("lock %p, "
1325                               "mismatching export pointers: %p, %p\n",
1326                               lock, lock->l_exp_refs_target, exp);
1327         }
1328         if (-- lock->l_exp_refs_nr == 0) {
1329                 list_del_init(&lock->l_exp_refs_link);
1330                 lock->l_exp_refs_target = NULL;
1331         }
1332         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1333                lock, exp, lock->l_exp_refs_nr);
1334         spin_unlock(&exp->exp_locks_list_guard);
1335 }
1336 EXPORT_SYMBOL(__class_export_del_lock_ref);
1337 #endif
1338
1339 /* A connection defines an export context in which preallocation can
1340    be managed. This releases the export pointer reference, and returns
1341    the export handle, so the export refcount is 1 when this function
1342    returns. */
1343 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1344                   struct obd_uuid *cluuid)
1345 {
1346         struct obd_export *export;
1347         LASSERT(conn != NULL);
1348         LASSERT(obd != NULL);
1349         LASSERT(cluuid != NULL);
1350         ENTRY;
1351
1352         export = class_new_export(obd, cluuid);
1353         if (IS_ERR(export))
1354                 RETURN(PTR_ERR(export));
1355
1356         conn->cookie = export->exp_handle.h_cookie;
1357         class_export_put(export);
1358
1359         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1360                cluuid->uuid, conn->cookie);
1361         RETURN(0);
1362 }
1363 EXPORT_SYMBOL(class_connect);
1364
1365 /* if export is involved in recovery then clean up related things */
1366 static void class_export_recovery_cleanup(struct obd_export *exp)
1367 {
1368         struct obd_device *obd = exp->exp_obd;
1369
1370         spin_lock(&obd->obd_recovery_task_lock);
1371         if (obd->obd_recovering) {
1372                 if (exp->exp_in_recovery) {
1373                         spin_lock(&exp->exp_lock);
1374                         exp->exp_in_recovery = 0;
1375                         spin_unlock(&exp->exp_lock);
1376                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1377                         atomic_dec(&obd->obd_connected_clients);
1378                 }
1379
1380                 /* if called during recovery then should update
1381                  * obd_stale_clients counter,
1382                  * lightweight exports are not counted */
1383                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1384                         exp->exp_obd->obd_stale_clients++;
1385         }
1386         spin_unlock(&obd->obd_recovery_task_lock);
1387
1388         spin_lock(&exp->exp_lock);
1389         /** Cleanup req replay fields */
1390         if (exp->exp_req_replay_needed) {
1391                 exp->exp_req_replay_needed = 0;
1392
1393                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1394                 atomic_dec(&obd->obd_req_replay_clients);
1395         }
1396
1397         /** Cleanup lock replay data */
1398         if (exp->exp_lock_replay_needed) {
1399                 exp->exp_lock_replay_needed = 0;
1400
1401                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1402                 atomic_dec(&obd->obd_lock_replay_clients);
1403         }
1404         spin_unlock(&exp->exp_lock);
1405 }
1406
1407 /* This function removes 1-3 references from the export:
1408  * 1 - for export pointer passed
1409  * and if disconnect really need
1410  * 2 - removing from hash
1411  * 3 - in client_unlink_export
1412  * The export pointer passed to this function can destroyed */
1413 int class_disconnect(struct obd_export *export)
1414 {
1415         int already_disconnected;
1416         ENTRY;
1417
1418         if (export == NULL) {
1419                 CWARN("attempting to free NULL export %p\n", export);
1420                 RETURN(-EINVAL);
1421         }
1422
1423         spin_lock(&export->exp_lock);
1424         already_disconnected = export->exp_disconnected;
1425         export->exp_disconnected = 1;
1426 #ifdef HAVE_SERVER_SUPPORT
1427         /*  We hold references of export for uuid hash
1428          *  and nid_hash and export link at least. So
1429          *  it is safe to call rh*table_remove_fast in
1430          *  there.
1431          */
1432         obd_nid_del(export->exp_obd, export);
1433 #endif /* HAVE_SERVER_SUPPORT */
1434         spin_unlock(&export->exp_lock);
1435
1436         /* class_cleanup(), abort_recovery(), and class_fail_export()
1437          * all end up in here, and if any of them race we shouldn't
1438          * call extra class_export_puts(). */
1439         if (already_disconnected)
1440                 GOTO(no_disconn, already_disconnected);
1441
1442         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1443                export->exp_handle.h_cookie);
1444
1445         class_export_recovery_cleanup(export);
1446         class_unlink_export(export);
1447 no_disconn:
1448         class_export_put(export);
1449         RETURN(0);
1450 }
1451 EXPORT_SYMBOL(class_disconnect);
1452
1453 /* Return non-zero for a fully connected export */
1454 int class_connected_export(struct obd_export *exp)
1455 {
1456         int connected = 0;
1457
1458         if (exp) {
1459                 spin_lock(&exp->exp_lock);
1460                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1461                 spin_unlock(&exp->exp_lock);
1462         }
1463         return connected;
1464 }
1465 EXPORT_SYMBOL(class_connected_export);
1466
1467 static void class_disconnect_export_list(struct list_head *list,
1468                                          enum obd_option flags)
1469 {
1470         int rc;
1471         struct obd_export *exp;
1472         ENTRY;
1473
1474         /* It's possible that an export may disconnect itself, but
1475          * nothing else will be added to this list. */
1476         while (!list_empty(list)) {
1477                 exp = list_first_entry(list, struct obd_export,
1478                                        exp_obd_chain);
1479                 /* need for safe call CDEBUG after obd_disconnect */
1480                 class_export_get(exp);
1481
1482                 spin_lock(&exp->exp_lock);
1483                 exp->exp_flags = flags;
1484                 spin_unlock(&exp->exp_lock);
1485
1486                 if (obd_uuid_equals(&exp->exp_client_uuid,
1487                                     &exp->exp_obd->obd_uuid)) {
1488                         CDEBUG(D_HA,
1489                                "exp %p export uuid == obd uuid, don't discon\n",
1490                                exp);
1491                         /* Need to delete this now so we don't end up pointing
1492                          * to work_list later when this export is cleaned up. */
1493                         list_del_init(&exp->exp_obd_chain);
1494                         class_export_put(exp);
1495                         continue;
1496                 }
1497
1498                 class_export_get(exp);
1499                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1500                        "last request at %lld\n",
1501                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1502                        exp, exp->exp_last_request_time);
1503                 /* release one export reference anyway */
1504                 rc = obd_disconnect(exp);
1505
1506                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1507                        obd_export_nid2str(exp), exp, rc);
1508                 class_export_put(exp);
1509         }
1510         EXIT;
1511 }
1512
1513 void class_disconnect_exports(struct obd_device *obd)
1514 {
1515         LIST_HEAD(work_list);
1516         ENTRY;
1517
1518         /* Move all of the exports from obd_exports to a work list, en masse. */
1519         spin_lock(&obd->obd_dev_lock);
1520         list_splice_init(&obd->obd_exports, &work_list);
1521         list_splice_init(&obd->obd_delayed_exports, &work_list);
1522         spin_unlock(&obd->obd_dev_lock);
1523
1524         if (!list_empty(&work_list)) {
1525                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1526                        "disconnecting them\n", obd->obd_minor, obd);
1527                 class_disconnect_export_list(&work_list,
1528                                              exp_flags_from_obd(obd));
1529         } else
1530                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1531                        obd->obd_minor, obd);
1532         EXIT;
1533 }
1534 EXPORT_SYMBOL(class_disconnect_exports);
1535
1536 /* Remove exports that have not completed recovery.
1537  */
1538 void class_disconnect_stale_exports(struct obd_device *obd,
1539                                     int (*test_export)(struct obd_export *))
1540 {
1541         LIST_HEAD(work_list);
1542         struct obd_export *exp, *n;
1543         int evicted = 0;
1544         ENTRY;
1545
1546         spin_lock(&obd->obd_dev_lock);
1547         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1548                                  exp_obd_chain) {
1549                 /* don't count self-export as client */
1550                 if (obd_uuid_equals(&exp->exp_client_uuid,
1551                                     &exp->exp_obd->obd_uuid))
1552                         continue;
1553
1554                 /* don't evict clients which have no slot in last_rcvd
1555                  * (e.g. lightweight connection) */
1556                 if (exp->exp_target_data.ted_lr_idx == -1)
1557                         continue;
1558
1559                 spin_lock(&exp->exp_lock);
1560                 if (exp->exp_failed || test_export(exp)) {
1561                         spin_unlock(&exp->exp_lock);
1562                         continue;
1563                 }
1564                 exp->exp_failed = 1;
1565                 spin_unlock(&exp->exp_lock);
1566
1567                 list_move(&exp->exp_obd_chain, &work_list);
1568                 evicted++;
1569                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1570                        obd->obd_name, exp->exp_client_uuid.uuid,
1571                        obd_export_nid2str(exp));
1572                 print_export_data(exp, "EVICTING", 0, D_HA);
1573         }
1574         spin_unlock(&obd->obd_dev_lock);
1575
1576         if (evicted)
1577                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1578                               obd->obd_name, evicted);
1579
1580         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1581                                                  OBD_OPT_ABORT_RECOV);
1582         EXIT;
1583 }
1584 EXPORT_SYMBOL(class_disconnect_stale_exports);
1585
1586 void class_fail_export(struct obd_export *exp)
1587 {
1588         int rc, already_failed;
1589
1590         spin_lock(&exp->exp_lock);
1591         already_failed = exp->exp_failed;
1592         exp->exp_failed = 1;
1593         spin_unlock(&exp->exp_lock);
1594
1595         if (already_failed) {
1596                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1597                        exp, exp->exp_client_uuid.uuid);
1598                 return;
1599         }
1600
1601         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1602                exp, exp->exp_client_uuid.uuid);
1603
1604         if (obd_dump_on_timeout)
1605                 libcfs_debug_dumplog();
1606
1607         /* need for safe call CDEBUG after obd_disconnect */
1608         class_export_get(exp);
1609
1610         /* Most callers into obd_disconnect are removing their own reference
1611          * (request, for example) in addition to the one from the hash table.
1612          * We don't have such a reference here, so make one. */
1613         class_export_get(exp);
1614         rc = obd_disconnect(exp);
1615         if (rc)
1616                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1617         else
1618                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1619                        exp, exp->exp_client_uuid.uuid);
1620         class_export_put(exp);
1621 }
1622 EXPORT_SYMBOL(class_fail_export);
1623
1624 #ifdef HAVE_SERVER_SUPPORT
1625
1626 static int take_first(struct obd_export *exp, void *data)
1627 {
1628         struct obd_export **expp = data;
1629
1630         if (*expp)
1631                 /* already have one */
1632                 return 0;
1633         if (exp->exp_failed)
1634                 /* Don't want this one */
1635                 return 0;
1636         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1637                 /* Cannot get a ref on this one */
1638                 return 0;
1639         *expp = exp;
1640         return 1;
1641 }
1642
1643 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1644 {
1645         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1646         struct obd_export *doomed_exp;
1647         int exports_evicted = 0;
1648
1649         spin_lock(&obd->obd_dev_lock);
1650         /* umount has run already, so evict thread should leave
1651          * its task to umount thread now */
1652         if (obd->obd_stopping) {
1653                 spin_unlock(&obd->obd_dev_lock);
1654                 return exports_evicted;
1655         }
1656         spin_unlock(&obd->obd_dev_lock);
1657
1658         doomed_exp = NULL;
1659         while (obd_nid_export_for_each(obd, nid_key,
1660                                        take_first, &doomed_exp) > 0) {
1661
1662                 LASSERTF(doomed_exp != obd->obd_self_export,
1663                          "self-export is hashed by NID?\n");
1664
1665                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1666                               obd->obd_name,
1667                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1668                               obd_export_nid2str(doomed_exp));
1669
1670                 class_fail_export(doomed_exp);
1671                 class_export_put(doomed_exp);
1672                 exports_evicted++;
1673                 doomed_exp = NULL;
1674         }
1675
1676         if (!exports_evicted)
1677                 CDEBUG(D_HA,
1678                        "%s: can't disconnect NID '%s': no exports found\n",
1679                        obd->obd_name, nid);
1680         return exports_evicted;
1681 }
1682 EXPORT_SYMBOL(obd_export_evict_by_nid);
1683
1684 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1685 {
1686         struct obd_export *doomed_exp = NULL;
1687         struct obd_uuid doomed_uuid;
1688         int exports_evicted = 0;
1689
1690         spin_lock(&obd->obd_dev_lock);
1691         if (obd->obd_stopping) {
1692                 spin_unlock(&obd->obd_dev_lock);
1693                 return exports_evicted;
1694         }
1695         spin_unlock(&obd->obd_dev_lock);
1696
1697         obd_str2uuid(&doomed_uuid, uuid);
1698         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1699                 CERROR("%s: can't evict myself\n", obd->obd_name);
1700                 return exports_evicted;
1701         }
1702
1703         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1704         if (doomed_exp == NULL) {
1705                 CERROR("%s: can't disconnect %s: no exports found\n",
1706                        obd->obd_name, uuid);
1707         } else {
1708                 CWARN("%s: evicting %s at adminstrative request\n",
1709                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1710                 class_fail_export(doomed_exp);
1711                 class_export_put(doomed_exp);
1712                 obd_uuid_del(obd, doomed_exp);
1713                 exports_evicted++;
1714         }
1715
1716         return exports_evicted;
1717 }
1718 #endif /* HAVE_SERVER_SUPPORT */
1719
1720 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1721 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1722 EXPORT_SYMBOL(class_export_dump_hook);
1723 #endif
1724
1725 static void print_export_data(struct obd_export *exp, const char *status,
1726                               int locks, int debug_level)
1727 {
1728         struct ptlrpc_reply_state *rs;
1729         struct ptlrpc_reply_state *first_reply = NULL;
1730         int nreplies = 0;
1731
1732         spin_lock(&exp->exp_lock);
1733         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1734                             rs_exp_list) {
1735                 if (nreplies == 0)
1736                         first_reply = rs;
1737                 nreplies++;
1738         }
1739         spin_unlock(&exp->exp_lock);
1740
1741         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1742                "%p %s %llu stale:%d\n",
1743                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1744                obd_export_nid2str(exp),
1745                refcount_read(&exp->exp_handle.h_ref),
1746                atomic_read(&exp->exp_rpc_count),
1747                atomic_read(&exp->exp_cb_count),
1748                atomic_read(&exp->exp_locks_count),
1749                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1750                nreplies, first_reply, nreplies > 3 ? "..." : "",
1751                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1752 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1753         if (locks && class_export_dump_hook != NULL)
1754                 class_export_dump_hook(exp);
1755 #endif
1756 }
1757
1758 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1759 {
1760         struct obd_export *exp;
1761
1762         spin_lock(&obd->obd_dev_lock);
1763         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1764                 print_export_data(exp, "ACTIVE", locks, debug_level);
1765         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1766                 print_export_data(exp, "UNLINKED", locks, debug_level);
1767         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1768                 print_export_data(exp, "DELAYED", locks, debug_level);
1769         spin_unlock(&obd->obd_dev_lock);
1770 }
1771
1772 void obd_exports_barrier(struct obd_device *obd)
1773 {
1774         int waited = 2;
1775         LASSERT(list_empty(&obd->obd_exports));
1776         spin_lock(&obd->obd_dev_lock);
1777         while (!list_empty(&obd->obd_unlinked_exports)) {
1778                 spin_unlock(&obd->obd_dev_lock);
1779                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1780                 if (waited > 5 && is_power_of_2(waited)) {
1781                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1782                                       "more than %d seconds. "
1783                                       "The obd refcount = %d. Is it stuck?\n",
1784                                       obd->obd_name, waited,
1785                                       atomic_read(&obd->obd_refcount));
1786                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1787                 }
1788                 waited *= 2;
1789                 spin_lock(&obd->obd_dev_lock);
1790         }
1791         spin_unlock(&obd->obd_dev_lock);
1792 }
1793 EXPORT_SYMBOL(obd_exports_barrier);
1794
1795 /**
1796  * Add export to the obd_zombe thread and notify it.
1797  */
1798 static void obd_zombie_export_add(struct obd_export *exp) {
1799         atomic_dec(&obd_stale_export_num);
1800         spin_lock(&exp->exp_obd->obd_dev_lock);
1801         LASSERT(!list_empty(&exp->exp_obd_chain));
1802         list_del_init(&exp->exp_obd_chain);
1803         spin_unlock(&exp->exp_obd->obd_dev_lock);
1804
1805         queue_work(zombie_wq, &exp->exp_zombie_work);
1806 }
1807
1808 /**
1809  * Add import to the obd_zombe thread and notify it.
1810  */
1811 static void obd_zombie_import_add(struct obd_import *imp) {
1812         LASSERT(imp->imp_sec == NULL);
1813
1814         queue_work(zombie_wq, &imp->imp_zombie_work);
1815 }
1816
1817 /**
1818  * wait when obd_zombie import/export queues become empty
1819  */
1820 void obd_zombie_barrier(void)
1821 {
1822         flush_workqueue(zombie_wq);
1823 }
1824 EXPORT_SYMBOL(obd_zombie_barrier);
1825
1826
1827 struct obd_export *obd_stale_export_get(void)
1828 {
1829         struct obd_export *exp = NULL;
1830         ENTRY;
1831
1832         spin_lock(&obd_stale_export_lock);
1833         if (!list_empty(&obd_stale_exports)) {
1834                 exp = list_first_entry(&obd_stale_exports,
1835                                        struct obd_export, exp_stale_list);
1836                 list_del_init(&exp->exp_stale_list);
1837         }
1838         spin_unlock(&obd_stale_export_lock);
1839
1840         if (exp) {
1841                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1842                        atomic_read(&obd_stale_export_num));
1843         }
1844         RETURN(exp);
1845 }
1846 EXPORT_SYMBOL(obd_stale_export_get);
1847
1848 void obd_stale_export_put(struct obd_export *exp)
1849 {
1850         ENTRY;
1851
1852         LASSERT(list_empty(&exp->exp_stale_list));
1853         if (exp->exp_lock_hash &&
1854             atomic_read(&exp->exp_lock_hash->hs_count)) {
1855                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1856                        atomic_read(&obd_stale_export_num));
1857
1858                 spin_lock_bh(&exp->exp_bl_list_lock);
1859                 spin_lock(&obd_stale_export_lock);
1860                 /* Add to the tail if there is no blocked locks,
1861                  * to the head otherwise. */
1862                 if (list_empty(&exp->exp_bl_list))
1863                         list_add_tail(&exp->exp_stale_list,
1864                                       &obd_stale_exports);
1865                 else
1866                         list_add(&exp->exp_stale_list,
1867                                  &obd_stale_exports);
1868
1869                 spin_unlock(&obd_stale_export_lock);
1870                 spin_unlock_bh(&exp->exp_bl_list_lock);
1871         } else {
1872                 class_export_put(exp);
1873         }
1874         EXIT;
1875 }
1876 EXPORT_SYMBOL(obd_stale_export_put);
1877
1878 /**
1879  * Adjust the position of the export in the stale list,
1880  * i.e. move to the head of the list if is needed.
1881  **/
1882 void obd_stale_export_adjust(struct obd_export *exp)
1883 {
1884         LASSERT(exp != NULL);
1885         spin_lock_bh(&exp->exp_bl_list_lock);
1886         spin_lock(&obd_stale_export_lock);
1887
1888         if (!list_empty(&exp->exp_stale_list) &&
1889             !list_empty(&exp->exp_bl_list))
1890                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1891
1892         spin_unlock(&obd_stale_export_lock);
1893         spin_unlock_bh(&exp->exp_bl_list_lock);
1894 }
1895 EXPORT_SYMBOL(obd_stale_export_adjust);
1896
1897 /**
1898  * start destroy zombie import/export thread
1899  */
1900 int obd_zombie_impexp_init(void)
1901 {
1902         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1903                                            0, CFS_CPT_ANY,
1904                                            cfs_cpt_number(cfs_cpt_tab));
1905
1906         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1907 }
1908
1909 /**
1910  * stop destroy zombie import/export thread
1911  */
1912 void obd_zombie_impexp_stop(void)
1913 {
1914         destroy_workqueue(zombie_wq);
1915         LASSERT(list_empty(&obd_stale_exports));
1916 }
1917
1918 /***** Kernel-userspace comm helpers *******/
1919
1920 /* Get length of entire message, including header */
1921 int kuc_len(int payload_len)
1922 {
1923         return sizeof(struct kuc_hdr) + payload_len;
1924 }
1925 EXPORT_SYMBOL(kuc_len);
1926
1927 /* Get a pointer to kuc header, given a ptr to the payload
1928  * @param p Pointer to payload area
1929  * @returns Pointer to kuc header
1930  */
1931 struct kuc_hdr * kuc_ptr(void *p)
1932 {
1933         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1934         LASSERT(lh->kuc_magic == KUC_MAGIC);
1935         return lh;
1936 }
1937 EXPORT_SYMBOL(kuc_ptr);
1938
1939 /* Alloc space for a message, and fill in header
1940  * @return Pointer to payload area
1941  */
1942 void *kuc_alloc(int payload_len, int transport, int type)
1943 {
1944         struct kuc_hdr *lh;
1945         int len = kuc_len(payload_len);
1946
1947         OBD_ALLOC(lh, len);
1948         if (lh == NULL)
1949                 return ERR_PTR(-ENOMEM);
1950
1951         lh->kuc_magic = KUC_MAGIC;
1952         lh->kuc_transport = transport;
1953         lh->kuc_msgtype = type;
1954         lh->kuc_msglen = len;
1955
1956         return (void *)(lh + 1);
1957 }
1958 EXPORT_SYMBOL(kuc_alloc);
1959
1960 /* Takes pointer to payload area */
1961 void kuc_free(void *p, int payload_len)
1962 {
1963         struct kuc_hdr *lh = kuc_ptr(p);
1964         OBD_FREE(lh, kuc_len(payload_len));
1965 }
1966 EXPORT_SYMBOL(kuc_free);
1967
1968 struct obd_request_slot_waiter {
1969         struct list_head        orsw_entry;
1970         wait_queue_head_t       orsw_waitq;
1971         bool                    orsw_signaled;
1972 };
1973
1974 static bool obd_request_slot_avail(struct client_obd *cli,
1975                                    struct obd_request_slot_waiter *orsw)
1976 {
1977         bool avail;
1978
1979         spin_lock(&cli->cl_loi_list_lock);
1980         avail = !!list_empty(&orsw->orsw_entry);
1981         spin_unlock(&cli->cl_loi_list_lock);
1982
1983         return avail;
1984 };
1985
1986 /*
1987  * For network flow control, the RPC sponsor needs to acquire a credit
1988  * before sending the RPC. The credits count for a connection is defined
1989  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1990  * the subsequent RPC sponsors need to wait until others released their
1991  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1992  */
1993 int obd_get_request_slot(struct client_obd *cli)
1994 {
1995         struct obd_request_slot_waiter   orsw;
1996         int                              rc;
1997
1998         spin_lock(&cli->cl_loi_list_lock);
1999         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2000                 cli->cl_rpcs_in_flight++;
2001                 spin_unlock(&cli->cl_loi_list_lock);
2002                 return 0;
2003         }
2004
2005         init_waitqueue_head(&orsw.orsw_waitq);
2006         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2007         orsw.orsw_signaled = false;
2008         spin_unlock(&cli->cl_loi_list_lock);
2009
2010         rc = l_wait_event_abortable(orsw.orsw_waitq,
2011                                     obd_request_slot_avail(cli, &orsw) ||
2012                                     orsw.orsw_signaled);
2013
2014         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2015          * freed but other (such as obd_put_request_slot) is using it. */
2016         spin_lock(&cli->cl_loi_list_lock);
2017         if (rc != 0) {
2018                 if (!orsw.orsw_signaled) {
2019                         if (list_empty(&orsw.orsw_entry))
2020                                 cli->cl_rpcs_in_flight--;
2021                         else
2022                                 list_del(&orsw.orsw_entry);
2023                 }
2024                 rc = -EINTR;
2025         }
2026
2027         if (orsw.orsw_signaled) {
2028                 LASSERT(list_empty(&orsw.orsw_entry));
2029
2030                 rc = -EINTR;
2031         }
2032         spin_unlock(&cli->cl_loi_list_lock);
2033
2034         return rc;
2035 }
2036 EXPORT_SYMBOL(obd_get_request_slot);
2037
2038 void obd_put_request_slot(struct client_obd *cli)
2039 {
2040         struct obd_request_slot_waiter *orsw;
2041
2042         spin_lock(&cli->cl_loi_list_lock);
2043         cli->cl_rpcs_in_flight--;
2044
2045         /* If there is free slot, wakeup the first waiter. */
2046         if (!list_empty(&cli->cl_flight_waiters) &&
2047             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2048                 orsw = list_first_entry(&cli->cl_flight_waiters,
2049                                         struct obd_request_slot_waiter,
2050                                         orsw_entry);
2051                 list_del_init(&orsw->orsw_entry);
2052                 cli->cl_rpcs_in_flight++;
2053                 wake_up(&orsw->orsw_waitq);
2054         }
2055         spin_unlock(&cli->cl_loi_list_lock);
2056 }
2057 EXPORT_SYMBOL(obd_put_request_slot);
2058
2059 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2060 {
2061         return cli->cl_max_rpcs_in_flight;
2062 }
2063 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2064
2065 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2066 {
2067         struct obd_request_slot_waiter *orsw;
2068         __u32                           old;
2069         int                             diff;
2070         int                             i;
2071         int                             rc;
2072
2073         if (max > OBD_MAX_RIF_MAX || max < 1)
2074                 return -ERANGE;
2075
2076         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2077                cli->cl_import->imp_obd->obd_name, max,
2078                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2079
2080         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2081                    LUSTRE_MDC_NAME) == 0) {
2082                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2083                  * strictly lower that max_rpcs_in_flight */
2084                 if (max < 2) {
2085                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2086                                cli->cl_import->imp_obd->obd_name);
2087                         return -ERANGE;
2088                 }
2089                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2090                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2091                         if (rc != 0)
2092                                 return rc;
2093                 }
2094         }
2095
2096         spin_lock(&cli->cl_loi_list_lock);
2097         old = cli->cl_max_rpcs_in_flight;
2098         cli->cl_max_rpcs_in_flight = max;
2099         client_adjust_max_dirty(cli);
2100
2101         diff = max - old;
2102
2103         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2104         for (i = 0; i < diff; i++) {
2105                 if (list_empty(&cli->cl_flight_waiters))
2106                         break;
2107
2108                 orsw = list_first_entry(&cli->cl_flight_waiters,
2109                                         struct obd_request_slot_waiter,
2110                                         orsw_entry);
2111                 list_del_init(&orsw->orsw_entry);
2112                 cli->cl_rpcs_in_flight++;
2113                 wake_up(&orsw->orsw_waitq);
2114         }
2115         spin_unlock(&cli->cl_loi_list_lock);
2116
2117         return 0;
2118 }
2119 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2120
2121 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2122 {
2123         return cli->cl_max_mod_rpcs_in_flight;
2124 }
2125 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2126
2127 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2128 {
2129         struct obd_connect_data *ocd;
2130         __u16 maxmodrpcs;
2131         __u16 prev;
2132
2133         if (max > OBD_MAX_RIF_MAX || max < 1)
2134                 return -ERANGE;
2135
2136         ocd = &cli->cl_import->imp_connect_data;
2137         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2138                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2139                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2140
2141         if (max == OBD_MAX_RIF_MAX)
2142                 max = OBD_MAX_RIF_MAX - 1;
2143
2144         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2145          * increase this value, also bump up max_rpcs_in_flight to match.
2146          */
2147         if (max >= cli->cl_max_rpcs_in_flight) {
2148                 CDEBUG(D_INFO,
2149                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2150                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2151                 obd_set_max_rpcs_in_flight(cli, max + 1);
2152         }
2153
2154         /* cannot exceed max modify RPCs in flight supported by the server,
2155          * but verify ocd_connect_flags is at least initialized first.  If
2156          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2157          */
2158         if (!ocd->ocd_connect_flags) {
2159                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2160         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2161                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2162                 if (maxmodrpcs == 0) { /* connection not finished yet */
2163                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2164                         CDEBUG(D_INFO,
2165                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2166                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2167                 }
2168         } else {
2169                 maxmodrpcs = 1;
2170         }
2171         if (max > maxmodrpcs) {
2172                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2173                        cli->cl_import->imp_obd->obd_name,
2174                        max, maxmodrpcs);
2175                 return -ERANGE;
2176         }
2177
2178         spin_lock(&cli->cl_mod_rpcs_lock);
2179
2180         prev = cli->cl_max_mod_rpcs_in_flight;
2181         cli->cl_max_mod_rpcs_in_flight = max;
2182
2183         /* wakeup waiters if limit has been increased */
2184         if (cli->cl_max_mod_rpcs_in_flight > prev)
2185                 wake_up(&cli->cl_mod_rpcs_waitq);
2186
2187         spin_unlock(&cli->cl_mod_rpcs_lock);
2188
2189         return 0;
2190 }
2191 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2192
2193 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2194                                struct seq_file *seq)
2195 {
2196         unsigned long mod_tot = 0, mod_cum;
2197         struct timespec64 now;
2198         int i;
2199
2200         ktime_get_real_ts64(&now);
2201
2202         spin_lock(&cli->cl_mod_rpcs_lock);
2203
2204         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2205                    (s64)now.tv_sec, now.tv_nsec);
2206         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2207                    cli->cl_mod_rpcs_in_flight);
2208
2209         seq_printf(seq, "\n\t\t\tmodify\n");
2210         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2211
2212         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2213
2214         mod_cum = 0;
2215         for (i = 0; i < OBD_HIST_MAX; i++) {
2216                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2217                 mod_cum += mod;
2218                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2219                            i, mod, pct(mod, mod_tot),
2220                            pct(mod_cum, mod_tot));
2221                 if (mod_cum == mod_tot)
2222                         break;
2223         }
2224
2225         spin_unlock(&cli->cl_mod_rpcs_lock);
2226
2227         return 0;
2228 }
2229 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2230
2231 /* The number of modify RPCs sent in parallel is limited
2232  * because the server has a finite number of slots per client to
2233  * store request result and ensure reply reconstruction when needed.
2234  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2235  * that takes into account server limit and cl_max_rpcs_in_flight
2236  * value.
2237  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2238  * one close request is allowed above the maximum.
2239  */
2240 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2241                                                  bool close_req)
2242 {
2243         bool avail;
2244
2245         /* A slot is available if
2246          * - number of modify RPCs in flight is less than the max
2247          * - it's a close RPC and no other close request is in flight
2248          */
2249         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2250                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2251
2252         return avail;
2253 }
2254
2255 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2256                                          bool close_req)
2257 {
2258         bool avail;
2259
2260         spin_lock(&cli->cl_mod_rpcs_lock);
2261         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2262         spin_unlock(&cli->cl_mod_rpcs_lock);
2263         return avail;
2264 }
2265
2266
2267 /* Get a modify RPC slot from the obd client @cli according
2268  * to the kind of operation @opc that is going to be sent
2269  * and the intent @it of the operation if it applies.
2270  * If the maximum number of modify RPCs in flight is reached
2271  * the thread is put to sleep.
2272  * Returns the tag to be set in the request message. Tag 0
2273  * is reserved for non-modifying requests.
2274  */
2275 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2276 {
2277         bool                    close_req = false;
2278         __u16                   i, max;
2279
2280         if (opc == MDS_CLOSE)
2281                 close_req = true;
2282
2283         do {
2284                 spin_lock(&cli->cl_mod_rpcs_lock);
2285                 max = cli->cl_max_mod_rpcs_in_flight;
2286                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2287                         /* there is a slot available */
2288                         cli->cl_mod_rpcs_in_flight++;
2289                         if (close_req)
2290                                 cli->cl_close_rpcs_in_flight++;
2291                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2292                                          cli->cl_mod_rpcs_in_flight);
2293                         /* find a free tag */
2294                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2295                                                 max + 1);
2296                         LASSERT(i < OBD_MAX_RIF_MAX);
2297                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2298                         spin_unlock(&cli->cl_mod_rpcs_lock);
2299                         /* tag 0 is reserved for non-modify RPCs */
2300
2301                         CDEBUG(D_RPCTRACE,
2302                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2303                                cli->cl_import->imp_obd->obd_name,
2304                                i + 1, opc, max);
2305
2306                         return i + 1;
2307                 }
2308                 spin_unlock(&cli->cl_mod_rpcs_lock);
2309
2310                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2311                        "opc %u, max %hu\n",
2312                        cli->cl_import->imp_obd->obd_name, opc, max);
2313
2314                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2315                                           obd_mod_rpc_slot_avail(cli,
2316                                                                  close_req));
2317         } while (true);
2318 }
2319 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2320
2321 /* Put a modify RPC slot from the obd client @cli according
2322  * to the kind of operation @opc that has been sent.
2323  */
2324 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2325 {
2326         bool                    close_req = false;
2327
2328         if (tag == 0)
2329                 return;
2330
2331         if (opc == MDS_CLOSE)
2332                 close_req = true;
2333
2334         spin_lock(&cli->cl_mod_rpcs_lock);
2335         cli->cl_mod_rpcs_in_flight--;
2336         if (close_req)
2337                 cli->cl_close_rpcs_in_flight--;
2338         /* release the tag in the bitmap */
2339         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2340         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2341         spin_unlock(&cli->cl_mod_rpcs_lock);
2342         wake_up(&cli->cl_mod_rpcs_waitq);
2343 }
2344 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2345