Whamcloud - gitweb
1e02f8930b2660a36e46d64a3314fdbb45ab141b
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         if (type->typ_md_ops)
176                 OBD_FREE_PTR(type->typ_md_ops);
177         if (type->typ_dt_ops)
178                 OBD_FREE_PTR(type->typ_dt_ops);
179
180         OBD_FREE(type, sizeof(*type));
181 }
182
183 static struct kobj_type class_ktype = {
184         .sysfs_ops      = &lustre_sysfs_ops,
185         .release        = class_sysfs_release,
186 };
187
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
190 {
191         struct dentry *symlink;
192         struct obd_type *type;
193         int rc;
194
195         type = class_search_type(name);
196         if (type) {
197                 kobject_put(&type->typ_kobj);
198                 return ERR_PTR(-EEXIST);
199         }
200
201         OBD_ALLOC(type, sizeof(*type));
202         if (!type)
203                 return ERR_PTR(-ENOMEM);
204
205         type->typ_kobj.kset = lustre_kset;
206         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207                                   &lustre_kset->kobj, "%s", name);
208         if (rc)
209                 return ERR_PTR(rc);
210
211         symlink = debugfs_create_dir(name, debugfs_lustre_root);
212         if (IS_ERR_OR_NULL(symlink)) {
213                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214                 kobject_put(&type->typ_kobj);
215                 return ERR_PTR(rc);
216         }
217         type->typ_debugfs_entry = symlink;
218         type->typ_sym_filter = true;
219
220         if (enable_proc) {
221                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
222                                                       NULL, NULL);
223                 if (IS_ERR(type->typ_procroot)) {
224                         CERROR("%s: can't create compat proc entry: %d\n",
225                                name, (int)PTR_ERR(type->typ_procroot));
226                         type->typ_procroot = NULL;
227                 }
228         }
229
230         return type;
231 }
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
234
235 #define CLASS_MAX_NAME 1024
236
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238                         bool enable_proc, struct lprocfs_vars *vars,
239                         const char *name, struct lu_device_type *ldt)
240 {
241         struct obd_type *type;
242         int rc;
243
244         ENTRY;
245         /* sanity check */
246         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
247
248         type = class_search_type(name);
249         if (type) {
250 #ifdef HAVE_SERVER_SUPPORT
251                 if (type->typ_sym_filter)
252                         goto dir_exist;
253 #endif /* HAVE_SERVER_SUPPORT */
254                 kobject_put(&type->typ_kobj);
255                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
256                 RETURN(-EEXIST);
257         }
258
259         OBD_ALLOC(type, sizeof(*type));
260         if (type == NULL)
261                 RETURN(-ENOMEM);
262
263         type->typ_kobj.kset = lustre_kset;
264         kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
266 dir_exist:
267 #endif /* HAVE_SERVER_SUPPORT */
268         OBD_ALLOC_PTR(type->typ_dt_ops);
269         OBD_ALLOC_PTR(type->typ_md_ops);
270
271         if (type->typ_dt_ops == NULL ||
272             type->typ_md_ops == NULL)
273                 GOTO (failed, rc = -ENOMEM);
274
275         *(type->typ_dt_ops) = *dt_ops;
276         /* md_ops is optional */
277         if (md_ops)
278                 *(type->typ_md_ops) = *md_ops;
279
280 #ifdef HAVE_SERVER_SUPPORT
281         if (type->typ_sym_filter) {
282                 type->typ_sym_filter = false;
283                 kobject_put(&type->typ_kobj);
284                 goto setup_ldt;
285         }
286 #endif
287 #ifdef CONFIG_PROC_FS
288         if (enable_proc && !type->typ_procroot) {
289                 type->typ_procroot = lprocfs_register(name,
290                                                       proc_lustre_root,
291                                                       NULL, type);
292                 if (IS_ERR(type->typ_procroot)) {
293                         rc = PTR_ERR(type->typ_procroot);
294                         type->typ_procroot = NULL;
295                         GOTO(failed, rc);
296                 }
297         }
298 #endif
299         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
300                                                     vars, type);
301         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
302                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
303                                              : -ENOMEM;
304                 type->typ_debugfs_entry = NULL;
305                 GOTO(failed, rc);
306         }
307
308         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
309         if (rc)
310                 GOTO(failed, rc);
311 #ifdef HAVE_SERVER_SUPPORT
312 setup_ldt:
313 #endif
314         if (ldt) {
315                 type->typ_lu = ldt;
316                 rc = lu_device_type_init(ldt);
317                 if (rc)
318                         GOTO(failed, rc);
319         }
320
321         RETURN(0);
322
323 failed:
324         kobject_put(&type->typ_kobj);
325
326         RETURN(rc);
327 }
328 EXPORT_SYMBOL(class_register_type);
329
330 int class_unregister_type(const char *name)
331 {
332         struct obd_type *type = class_search_type(name);
333         int rc = 0;
334         ENTRY;
335
336         if (!type) {
337                 CERROR("unknown obd type\n");
338                 RETURN(-EINVAL);
339         }
340
341         if (atomic_read(&type->typ_refcnt)) {
342                 CERROR("type %s has refcount (%d)\n", name,
343                        atomic_read(&type->typ_refcnt));
344                 /* This is a bad situation, let's make the best of it */
345                 /* Remove ops, but leave the name for debugging */
346                 OBD_FREE_PTR(type->typ_dt_ops);
347                 OBD_FREE_PTR(type->typ_md_ops);
348                 GOTO(out_put, rc = -EBUSY);
349         }
350
351         /* Put the final ref */
352         kobject_put(&type->typ_kobj);
353 out_put:
354         /* Put the ref returned by class_search_type() */
355         kobject_put(&type->typ_kobj);
356
357         RETURN(rc);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
360
361 /**
362  * Create a new obd device.
363  *
364  * Allocate the new obd_device and initialize it.
365  *
366  * \param[in] type_name obd device type string.
367  * \param[in] name      obd device name.
368  * \param[in] uuid      obd device UUID
369  *
370  * \retval newdev         pointer to created obd_device
371  * \retval ERR_PTR(errno) on error
372  */
373 struct obd_device *class_newdev(const char *type_name, const char *name,
374                                 const char *uuid)
375 {
376         struct obd_device *newdev;
377         struct obd_type *type = NULL;
378         ENTRY;
379
380         if (strlen(name) >= MAX_OBD_NAME) {
381                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382                 RETURN(ERR_PTR(-EINVAL));
383         }
384
385         type = class_get_type(type_name);
386         if (type == NULL){
387                 CERROR("OBD: unknown type: %s\n", type_name);
388                 RETURN(ERR_PTR(-ENODEV));
389         }
390
391         newdev = obd_device_alloc();
392         if (newdev == NULL) {
393                 class_put_type(type);
394                 RETURN(ERR_PTR(-ENOMEM));
395         }
396         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398         newdev->obd_type = type;
399         newdev->obd_minor = -1;
400
401         rwlock_init(&newdev->obd_pool_lock);
402         newdev->obd_pool_limit = 0;
403         newdev->obd_pool_slv = 0;
404
405         INIT_LIST_HEAD(&newdev->obd_exports);
406         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408         INIT_LIST_HEAD(&newdev->obd_exports_timed);
409         INIT_LIST_HEAD(&newdev->obd_nid_stats);
410         spin_lock_init(&newdev->obd_nid_lock);
411         spin_lock_init(&newdev->obd_dev_lock);
412         mutex_init(&newdev->obd_dev_mutex);
413         spin_lock_init(&newdev->obd_osfs_lock);
414         /* newdev->obd_osfs_age must be set to a value in the distant
415          * past to guarantee a fresh statfs is fetched on mount. */
416         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
417
418         /* XXX belongs in setup not attach  */
419         init_rwsem(&newdev->obd_observer_link_sem);
420         /* recovery data */
421         spin_lock_init(&newdev->obd_recovery_task_lock);
422         init_waitqueue_head(&newdev->obd_next_transno_waitq);
423         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427         INIT_LIST_HEAD(&newdev->obd_evict_list);
428         INIT_LIST_HEAD(&newdev->obd_lwp_list);
429
430         llog_group_init(&newdev->obd_olg);
431         /* Detach drops this */
432         atomic_set(&newdev->obd_refcount, 1);
433         lu_ref_init(&newdev->obd_reference);
434         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
435
436         newdev->obd_conn_inprogress = 0;
437
438         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
439
440         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441                newdev->obd_name, newdev);
442
443         return newdev;
444 }
445
446 /**
447  * Free obd device.
448  *
449  * \param[in] obd obd_device to be freed
450  *
451  * \retval none
452  */
453 void class_free_dev(struct obd_device *obd)
454 {
455         struct obd_type *obd_type = obd->obd_type;
456
457         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460                  "obd %p != obd_devs[%d] %p\n",
461                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463                  "obd_refcount should be 0, not %d\n",
464                  atomic_read(&obd->obd_refcount));
465         LASSERT(obd_type != NULL);
466
467         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468                obd->obd_name, obd->obd_type->typ_name);
469
470         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471                          obd->obd_name, obd->obd_uuid.uuid);
472         if (obd->obd_stopping) {
473                 int err;
474
475                 /* If we're not stopping, we were never set up */
476                 err = obd_cleanup(obd);
477                 if (err)
478                         CERROR("Cleanup %s returned %d\n",
479                                 obd->obd_name, err);
480         }
481
482         obd_device_free(obd);
483
484         class_put_type(obd_type);
485 }
486
487 /**
488  * Unregister obd device.
489  *
490  * Free slot in obd_dev[] used by \a obd.
491  *
492  * \param[in] new_obd obd_device to be unregistered
493  *
494  * \retval none
495  */
496 void class_unregister_device(struct obd_device *obd)
497 {
498         write_lock(&obd_dev_lock);
499         if (obd->obd_minor >= 0) {
500                 LASSERT(obd_devs[obd->obd_minor] == obd);
501                 obd_devs[obd->obd_minor] = NULL;
502                 obd->obd_minor = -1;
503         }
504         write_unlock(&obd_dev_lock);
505 }
506
507 /**
508  * Register obd device.
509  *
510  * Find free slot in obd_devs[], fills it with \a new_obd.
511  *
512  * \param[in] new_obd obd_device to be registered
513  *
514  * \retval 0          success
515  * \retval -EEXIST    device with this name is registered
516  * \retval -EOVERFLOW obd_devs[] is full
517  */
518 int class_register_device(struct obd_device *new_obd)
519 {
520         int ret = 0;
521         int i;
522         int new_obd_minor = 0;
523         bool minor_assign = false;
524         bool retried = false;
525
526 again:
527         write_lock(&obd_dev_lock);
528         for (i = 0; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530
531                 if (obd != NULL &&
532                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
533
534                         if (!retried) {
535                                 write_unlock(&obd_dev_lock);
536
537                                 /* the obd_device could be waited to be
538                                  * destroyed by the "obd_zombie_impexp_thread".
539                                  */
540                                 obd_zombie_barrier();
541                                 retried = true;
542                                 goto again;
543                         }
544
545                         CERROR("%s: already exists, won't add\n",
546                                obd->obd_name);
547                         /* in case we found a free slot before duplicate */
548                         minor_assign = false;
549                         ret = -EEXIST;
550                         break;
551                 }
552                 if (!minor_assign && obd == NULL) {
553                         new_obd_minor = i;
554                         minor_assign = true;
555                 }
556         }
557
558         if (minor_assign) {
559                 new_obd->obd_minor = new_obd_minor;
560                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562                 obd_devs[new_obd_minor] = new_obd;
563         } else {
564                 if (ret == 0) {
565                         ret = -EOVERFLOW;
566                         CERROR("%s: all %u/%u devices used, increase "
567                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568                                i, class_devno_max(), ret);
569                 }
570         }
571         write_unlock(&obd_dev_lock);
572
573         RETURN(ret);
574 }
575
576 static int class_name2dev_nolock(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         for (i = 0; i < class_devno_max(); i++) {
584                 struct obd_device *obd = class_num2obd(i);
585
586                 if (obd && strcmp(name, obd->obd_name) == 0) {
587                         /* Make sure we finished attaching before we give
588                            out any references */
589                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590                         if (obd->obd_attached) {
591                                 return i;
592                         }
593                         break;
594                 }
595         }
596
597         return -1;
598 }
599
600 int class_name2dev(const char *name)
601 {
602         int i;
603
604         if (!name)
605                 return -1;
606
607         read_lock(&obd_dev_lock);
608         i = class_name2dev_nolock(name);
609         read_unlock(&obd_dev_lock);
610
611         return i;
612 }
613 EXPORT_SYMBOL(class_name2dev);
614
615 struct obd_device *class_name2obd(const char *name)
616 {
617         int dev = class_name2dev(name);
618
619         if (dev < 0 || dev > class_devno_max())
620                 return NULL;
621         return class_num2obd(dev);
622 }
623 EXPORT_SYMBOL(class_name2obd);
624
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
626 {
627         int i;
628
629         for (i = 0; i < class_devno_max(); i++) {
630                 struct obd_device *obd = class_num2obd(i);
631
632                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
634                         return i;
635                 }
636         }
637
638         return -1;
639 }
640
641 int class_uuid2dev(struct obd_uuid *uuid)
642 {
643         int i;
644
645         read_lock(&obd_dev_lock);
646         i = class_uuid2dev_nolock(uuid);
647         read_unlock(&obd_dev_lock);
648
649         return i;
650 }
651 EXPORT_SYMBOL(class_uuid2dev);
652
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
654 {
655         int dev = class_uuid2dev(uuid);
656         if (dev < 0)
657                 return NULL;
658         return class_num2obd(dev);
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 /**
663  * Get obd device from ::obd_devs[]
664  *
665  * \param num [in] array index
666  *
667  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668  *         otherwise return the obd device there.
669  */
670 struct obd_device *class_num2obd(int num)
671 {
672         struct obd_device *obd = NULL;
673
674         if (num < class_devno_max()) {
675                 obd = obd_devs[num];
676                 if (obd == NULL)
677                         return NULL;
678
679                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680                          "%p obd_magic %08x != %08x\n",
681                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682                 LASSERTF(obd->obd_minor == num,
683                          "%p obd_minor %0d != %0d\n",
684                          obd, obd->obd_minor, num);
685         }
686
687         return obd;
688 }
689
690 /**
691  * Find obd in obd_dev[] by name or uuid.
692  *
693  * Increment obd's refcount if found.
694  *
695  * \param[in] str obd name or uuid
696  *
697  * \retval NULL    if not found
698  * \retval target  pointer to found obd_device
699  */
700 struct obd_device *class_dev_by_str(const char *str)
701 {
702         struct obd_device *target = NULL;
703         struct obd_uuid tgtuuid;
704         int rc;
705
706         obd_str2uuid(&tgtuuid, str);
707
708         read_lock(&obd_dev_lock);
709         rc = class_uuid2dev_nolock(&tgtuuid);
710         if (rc < 0)
711                 rc = class_name2dev_nolock(str);
712
713         if (rc >= 0)
714                 target = class_num2obd(rc);
715
716         if (target != NULL)
717                 class_incref(target, "find", current);
718         read_unlock(&obd_dev_lock);
719
720         RETURN(target);
721 }
722 EXPORT_SYMBOL(class_dev_by_str);
723
724 /**
725  * Get obd devices count. Device in any
726  *    state are counted
727  * \retval obd device count
728  */
729 int get_devices_count(void)
730 {
731         int index, max_index = class_devno_max(), dev_count = 0;
732
733         read_lock(&obd_dev_lock);
734         for (index = 0; index <= max_index; index++) {
735                 struct obd_device *obd = class_num2obd(index);
736                 if (obd != NULL)
737                         dev_count++;
738         }
739         read_unlock(&obd_dev_lock);
740
741         return dev_count;
742 }
743 EXPORT_SYMBOL(get_devices_count);
744
745 void class_obd_list(void)
746 {
747         char *status;
748         int i;
749
750         read_lock(&obd_dev_lock);
751         for (i = 0; i < class_devno_max(); i++) {
752                 struct obd_device *obd = class_num2obd(i);
753
754                 if (obd == NULL)
755                         continue;
756                 if (obd->obd_stopping)
757                         status = "ST";
758                 else if (obd->obd_set_up)
759                         status = "UP";
760                 else if (obd->obd_attached)
761                         status = "AT";
762                 else
763                         status = "--";
764                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765                          i, status, obd->obd_type->typ_name,
766                          obd->obd_name, obd->obd_uuid.uuid,
767                          atomic_read(&obd->obd_refcount));
768         }
769         read_unlock(&obd_dev_lock);
770 }
771
772 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
773    specified, then only the client with that uuid is returned,
774    otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776                                           const char *type_name,
777                                           struct obd_uuid *grp_uuid)
778 {
779         int i;
780
781         read_lock(&obd_dev_lock);
782         for (i = 0; i < class_devno_max(); i++) {
783                 struct obd_device *obd = class_num2obd(i);
784
785                 if (obd == NULL)
786                         continue;
787                 if ((strncmp(obd->obd_type->typ_name, type_name,
788                              strlen(type_name)) == 0)) {
789                         if (obd_uuid_equals(tgt_uuid,
790                                             &obd->u.cli.cl_target_uuid) &&
791                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
792                                                          &obd->obd_uuid) : 1)) {
793                                 read_unlock(&obd_dev_lock);
794                                 return obd;
795                         }
796                 }
797         }
798         read_unlock(&obd_dev_lock);
799
800         return NULL;
801 }
802 EXPORT_SYMBOL(class_find_client_obd);
803
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805    searching at *next, and if a device is found, the next index to look
806    at is saved in *next. If next is NULL, then the first matching device
807    will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
809 {
810         int i;
811
812         if (next == NULL)
813                 i = 0;
814         else if (*next >= 0 && *next < class_devno_max())
815                 i = *next;
816         else
817                 return NULL;
818
819         read_lock(&obd_dev_lock);
820         for (; i < class_devno_max(); i++) {
821                 struct obd_device *obd = class_num2obd(i);
822
823                 if (obd == NULL)
824                         continue;
825                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
826                         if (next != NULL)
827                                 *next = i+1;
828                         read_unlock(&obd_dev_lock);
829                         return obd;
830                 }
831         }
832         read_unlock(&obd_dev_lock);
833
834         return NULL;
835 }
836 EXPORT_SYMBOL(class_devices_in_group);
837
838 /**
839  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840  * adjust sptlrpc settings accordingly.
841  */
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
843 {
844         struct obd_device  *obd;
845         const char         *type;
846         int                 i, rc = 0, rc2;
847
848         LASSERT(namelen > 0);
849
850         read_lock(&obd_dev_lock);
851         for (i = 0; i < class_devno_max(); i++) {
852                 obd = class_num2obd(i);
853
854                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
855                         continue;
856
857                 /* only notify mdc, osc, osp, lwp, mdt, ost
858                  * because only these have a -sptlrpc llog */
859                 type = obd->obd_type->typ_name;
860                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865                     strcmp(type, LUSTRE_OST_NAME) != 0)
866                         continue;
867
868                 if (strncmp(obd->obd_name, fsname, namelen))
869                         continue;
870
871                 class_incref(obd, __FUNCTION__, obd);
872                 read_unlock(&obd_dev_lock);
873                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874                                          sizeof(KEY_SPTLRPC_CONF),
875                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
876                 rc = rc ? rc : rc2;
877                 class_decref(obd, __FUNCTION__, obd);
878                 read_lock(&obd_dev_lock);
879         }
880         read_unlock(&obd_dev_lock);
881         return rc;
882 }
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
884
885 void obd_cleanup_caches(void)
886 {
887         ENTRY;
888         if (obd_device_cachep) {
889                 kmem_cache_destroy(obd_device_cachep);
890                 obd_device_cachep = NULL;
891         }
892
893         EXIT;
894 }
895
896 int obd_init_caches(void)
897 {
898         int rc;
899         ENTRY;
900
901         LASSERT(obd_device_cachep == NULL);
902         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903                                 sizeof(struct obd_device),
904                                 0, 0, 0, sizeof(struct obd_device), NULL);
905         if (!obd_device_cachep)
906                 GOTO(out, rc = -ENOMEM);
907
908         RETURN(0);
909 out:
910         obd_cleanup_caches();
911         RETURN(rc);
912 }
913
914 static struct portals_handle_ops export_handle_ops;
915
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
918 {
919         struct obd_export *export;
920         ENTRY;
921
922         if (!conn) {
923                 CDEBUG(D_CACHE, "looking for null handle\n");
924                 RETURN(NULL);
925         }
926
927         if (conn->cookie == -1) {  /* this means assign a new connection */
928                 CDEBUG(D_CACHE, "want a new connection\n");
929                 RETURN(NULL);
930         }
931
932         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933         export = class_handle2object(conn->cookie, &export_handle_ops);
934         RETURN(export);
935 }
936 EXPORT_SYMBOL(class_conn2export);
937
938 struct obd_device *class_exp2obd(struct obd_export *exp)
939 {
940         if (exp)
941                 return exp->exp_obd;
942         return NULL;
943 }
944 EXPORT_SYMBOL(class_exp2obd);
945
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
947 {
948         struct obd_device *obd = exp->exp_obd;
949         if (obd == NULL)
950                 return NULL;
951         return obd->u.cli.cl_import;
952 }
953 EXPORT_SYMBOL(class_exp2cliimp);
954
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
957 {
958         struct obd_device *obd = exp->exp_obd;
959         ENTRY;
960
961         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962         LASSERT(obd != NULL);
963
964         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965                exp->exp_client_uuid.uuid, obd->obd_name);
966
967         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968         if (exp->exp_connection)
969                 ptlrpc_put_connection_superhack(exp->exp_connection);
970
971         LASSERT(list_empty(&exp->exp_outstanding_replies));
972         LASSERT(list_empty(&exp->exp_uncommitted_replies));
973         LASSERT(list_empty(&exp->exp_req_replay_queue));
974         LASSERT(list_empty(&exp->exp_hp_rpcs));
975         obd_destroy_export(exp);
976         /* self export doesn't hold a reference to an obd, although it
977          * exists until freeing of the obd */
978         if (exp != obd->obd_self_export)
979                 class_decref(obd, "export", exp);
980
981         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
982         EXIT;
983 }
984
985 static struct portals_handle_ops export_handle_ops = {
986         .hop_free   = NULL,
987         .hop_type       = "export",
988 };
989
990 struct obd_export *class_export_get(struct obd_export *exp)
991 {
992         refcount_inc(&exp->exp_handle.h_ref);
993         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994                refcount_read(&exp->exp_handle.h_ref));
995         return exp;
996 }
997 EXPORT_SYMBOL(class_export_get);
998
999 void class_export_put(struct obd_export *exp)
1000 {
1001         LASSERT(exp != NULL);
1002         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
1003         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005                refcount_read(&exp->exp_handle.h_ref) - 1);
1006
1007         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008                 struct obd_device *obd = exp->exp_obd;
1009
1010                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011                        exp, exp->exp_client_uuid.uuid);
1012
1013                 /* release nid stat refererence */
1014                 lprocfs_exp_cleanup(exp);
1015
1016                 if (exp == obd->obd_self_export) {
1017                         /* self export should be destroyed without
1018                          * zombie thread as it doesn't hold a
1019                          * reference to obd and doesn't hold any
1020                          * resources */
1021                         class_export_destroy(exp);
1022                         /* self export is destroyed, no class
1023                          * references exist and it is safe to free
1024                          * obd */
1025                         class_free_dev(obd);
1026                 } else {
1027                         LASSERT(!list_empty(&exp->exp_obd_chain));
1028                         obd_zombie_export_add(exp);
1029                 }
1030
1031         }
1032 }
1033 EXPORT_SYMBOL(class_export_put);
1034
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1036 {
1037         struct obd_export *export;
1038
1039         export = container_of(ws, struct obd_export, exp_zombie_work);
1040         class_export_destroy(export);
1041 }
1042
1043 /* Creates a new export, adds it to the hash table, and returns a
1044  * pointer to it. The refcount is 2: one for the hash reference, and
1045  * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047                                       struct obd_uuid *cluuid, bool is_self)
1048 {
1049         struct obd_export *export;
1050         int rc = 0;
1051         ENTRY;
1052
1053         OBD_ALLOC_PTR(export);
1054         if (!export)
1055                 return ERR_PTR(-ENOMEM);
1056
1057         export->exp_conn_cnt = 0;
1058         export->exp_lock_hash = NULL;
1059         export->exp_flock_hash = NULL;
1060         /* 2 = class_handle_hash + last */
1061         refcount_set(&export->exp_handle.h_ref, 2);
1062         atomic_set(&export->exp_rpc_count, 0);
1063         atomic_set(&export->exp_cb_count, 0);
1064         atomic_set(&export->exp_locks_count, 0);
1065 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1066         INIT_LIST_HEAD(&export->exp_locks_list);
1067         spin_lock_init(&export->exp_locks_list_guard);
1068 #endif
1069         atomic_set(&export->exp_replay_count, 0);
1070         export->exp_obd = obd;
1071         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1072         spin_lock_init(&export->exp_uncommitted_replies_lock);
1073         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1074         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1075         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1076         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1077         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1078         class_handle_hash(&export->exp_handle, &export_handle_ops);
1079         export->exp_last_request_time = ktime_get_real_seconds();
1080         spin_lock_init(&export->exp_lock);
1081         spin_lock_init(&export->exp_rpc_lock);
1082         INIT_HLIST_NODE(&export->exp_nid_hash);
1083         INIT_HLIST_NODE(&export->exp_gen_hash);
1084         spin_lock_init(&export->exp_bl_list_lock);
1085         INIT_LIST_HEAD(&export->exp_bl_list);
1086         INIT_LIST_HEAD(&export->exp_stale_list);
1087         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1088
1089         export->exp_sp_peer = LUSTRE_SP_ANY;
1090         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1091         export->exp_client_uuid = *cluuid;
1092         obd_init_export(export);
1093
1094         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1095
1096         spin_lock(&obd->obd_dev_lock);
1097         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1098                 /* shouldn't happen, but might race */
1099                 if (obd->obd_stopping)
1100                         GOTO(exit_unlock, rc = -ENODEV);
1101
1102                 rc = obd_uuid_add(obd, export);
1103                 if (rc != 0) {
1104                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1105                                       obd->obd_name, cluuid->uuid, rc);
1106                         GOTO(exit_unlock, rc = -EALREADY);
1107                 }
1108         }
1109
1110         if (!is_self) {
1111                 class_incref(obd, "export", export);
1112                 list_add_tail(&export->exp_obd_chain_timed,
1113                               &obd->obd_exports_timed);
1114                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1115                 obd->obd_num_exports++;
1116         } else {
1117                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1118                 INIT_LIST_HEAD(&export->exp_obd_chain);
1119         }
1120         spin_unlock(&obd->obd_dev_lock);
1121         RETURN(export);
1122
1123 exit_unlock:
1124         spin_unlock(&obd->obd_dev_lock);
1125         class_handle_unhash(&export->exp_handle);
1126         obd_destroy_export(export);
1127         OBD_FREE_PTR(export);
1128         return ERR_PTR(rc);
1129 }
1130
1131 struct obd_export *class_new_export(struct obd_device *obd,
1132                                     struct obd_uuid *uuid)
1133 {
1134         return __class_new_export(obd, uuid, false);
1135 }
1136 EXPORT_SYMBOL(class_new_export);
1137
1138 struct obd_export *class_new_export_self(struct obd_device *obd,
1139                                          struct obd_uuid *uuid)
1140 {
1141         return __class_new_export(obd, uuid, true);
1142 }
1143
1144 void class_unlink_export(struct obd_export *exp)
1145 {
1146         class_handle_unhash(&exp->exp_handle);
1147
1148         if (exp->exp_obd->obd_self_export == exp) {
1149                 class_export_put(exp);
1150                 return;
1151         }
1152
1153         spin_lock(&exp->exp_obd->obd_dev_lock);
1154         /* delete an uuid-export hashitem from hashtables */
1155         if (exp != exp->exp_obd->obd_self_export)
1156                 obd_uuid_del(exp->exp_obd, exp);
1157
1158 #ifdef HAVE_SERVER_SUPPORT
1159         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1160                 struct tg_export_data   *ted = &exp->exp_target_data;
1161                 struct cfs_hash         *hash;
1162
1163                 /* Because obd_gen_hash will not be released until
1164                  * class_cleanup(), so hash should never be NULL here */
1165                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1166                 LASSERT(hash != NULL);
1167                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1168                              &exp->exp_gen_hash);
1169                 cfs_hash_putref(hash);
1170         }
1171 #endif /* HAVE_SERVER_SUPPORT */
1172
1173         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1174         list_del_init(&exp->exp_obd_chain_timed);
1175         exp->exp_obd->obd_num_exports--;
1176         spin_unlock(&exp->exp_obd->obd_dev_lock);
1177         atomic_inc(&obd_stale_export_num);
1178
1179         /* A reference is kept by obd_stale_exports list */
1180         obd_stale_export_put(exp);
1181 }
1182 EXPORT_SYMBOL(class_unlink_export);
1183
1184 /* Import management functions */
1185 static void obd_zombie_import_free(struct obd_import *imp)
1186 {
1187         ENTRY;
1188
1189         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1190                 imp->imp_obd->obd_name);
1191
1192         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1193
1194         ptlrpc_put_connection_superhack(imp->imp_connection);
1195
1196         while (!list_empty(&imp->imp_conn_list)) {
1197                 struct obd_import_conn *imp_conn;
1198
1199                 imp_conn = list_entry(imp->imp_conn_list.next,
1200                                       struct obd_import_conn, oic_item);
1201                 list_del_init(&imp_conn->oic_item);
1202                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1203                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1204         }
1205
1206         LASSERT(imp->imp_sec == NULL);
1207         class_decref(imp->imp_obd, "import", imp);
1208         OBD_FREE_PTR(imp);
1209         EXIT;
1210 }
1211
1212 struct obd_import *class_import_get(struct obd_import *import)
1213 {
1214         atomic_inc(&import->imp_refcount);
1215         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1216                atomic_read(&import->imp_refcount),
1217                import->imp_obd->obd_name);
1218         return import;
1219 }
1220 EXPORT_SYMBOL(class_import_get);
1221
1222 void class_import_put(struct obd_import *imp)
1223 {
1224         ENTRY;
1225
1226         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1227
1228         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1229                atomic_read(&imp->imp_refcount) - 1,
1230                imp->imp_obd->obd_name);
1231
1232         if (atomic_dec_and_test(&imp->imp_refcount)) {
1233                 CDEBUG(D_INFO, "final put import %p\n", imp);
1234                 obd_zombie_import_add(imp);
1235         }
1236
1237         EXIT;
1238 }
1239 EXPORT_SYMBOL(class_import_put);
1240
1241 static void init_imp_at(struct imp_at *at) {
1242         int i;
1243         at_init(&at->iat_net_latency, 0, 0);
1244         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1245                 /* max service estimates are tracked on the server side, so
1246                    don't use the AT history here, just use the last reported
1247                    val. (But keep hist for proc histogram, worst_ever) */
1248                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1249                         AT_FLG_NOHIST);
1250         }
1251 }
1252
1253 static void obd_zombie_imp_cull(struct work_struct *ws)
1254 {
1255         struct obd_import *import;
1256
1257         import = container_of(ws, struct obd_import, imp_zombie_work);
1258         obd_zombie_import_free(import);
1259 }
1260
1261 struct obd_import *class_new_import(struct obd_device *obd)
1262 {
1263         struct obd_import *imp;
1264         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1265
1266         OBD_ALLOC(imp, sizeof(*imp));
1267         if (imp == NULL)
1268                 return NULL;
1269
1270         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1271         INIT_LIST_HEAD(&imp->imp_replay_list);
1272         INIT_LIST_HEAD(&imp->imp_sending_list);
1273         INIT_LIST_HEAD(&imp->imp_delayed_list);
1274         INIT_LIST_HEAD(&imp->imp_committed_list);
1275         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1276         imp->imp_known_replied_xid = 0;
1277         imp->imp_replay_cursor = &imp->imp_committed_list;
1278         spin_lock_init(&imp->imp_lock);
1279         imp->imp_last_success_conn = 0;
1280         imp->imp_state = LUSTRE_IMP_NEW;
1281         imp->imp_obd = class_incref(obd, "import", imp);
1282         rwlock_init(&imp->imp_sec_lock);
1283         init_waitqueue_head(&imp->imp_recovery_waitq);
1284         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1285
1286         if (curr_pid_ns->child_reaper)
1287                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1288         else
1289                 imp->imp_sec_refpid = 1;
1290
1291         atomic_set(&imp->imp_refcount, 2);
1292         atomic_set(&imp->imp_unregistering, 0);
1293         atomic_set(&imp->imp_inflight, 0);
1294         atomic_set(&imp->imp_replay_inflight, 0);
1295         atomic_set(&imp->imp_inval_count, 0);
1296         INIT_LIST_HEAD(&imp->imp_conn_list);
1297         init_imp_at(&imp->imp_at);
1298
1299         /* the default magic is V2, will be used in connect RPC, and
1300          * then adjusted according to the flags in request/reply. */
1301         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1302
1303         return imp;
1304 }
1305 EXPORT_SYMBOL(class_new_import);
1306
1307 void class_destroy_import(struct obd_import *import)
1308 {
1309         LASSERT(import != NULL);
1310         LASSERT(import != LP_POISON);
1311
1312         spin_lock(&import->imp_lock);
1313         import->imp_generation++;
1314         spin_unlock(&import->imp_lock);
1315         class_import_put(import);
1316 }
1317 EXPORT_SYMBOL(class_destroy_import);
1318
1319 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1320
1321 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1322 {
1323         spin_lock(&exp->exp_locks_list_guard);
1324
1325         LASSERT(lock->l_exp_refs_nr >= 0);
1326
1327         if (lock->l_exp_refs_target != NULL &&
1328             lock->l_exp_refs_target != exp) {
1329                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1330                               exp, lock, lock->l_exp_refs_target);
1331         }
1332         if ((lock->l_exp_refs_nr ++) == 0) {
1333                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1334                 lock->l_exp_refs_target = exp;
1335         }
1336         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1337                lock, exp, lock->l_exp_refs_nr);
1338         spin_unlock(&exp->exp_locks_list_guard);
1339 }
1340 EXPORT_SYMBOL(__class_export_add_lock_ref);
1341
1342 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1343 {
1344         spin_lock(&exp->exp_locks_list_guard);
1345         LASSERT(lock->l_exp_refs_nr > 0);
1346         if (lock->l_exp_refs_target != exp) {
1347                 LCONSOLE_WARN("lock %p, "
1348                               "mismatching export pointers: %p, %p\n",
1349                               lock, lock->l_exp_refs_target, exp);
1350         }
1351         if (-- lock->l_exp_refs_nr == 0) {
1352                 list_del_init(&lock->l_exp_refs_link);
1353                 lock->l_exp_refs_target = NULL;
1354         }
1355         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1356                lock, exp, lock->l_exp_refs_nr);
1357         spin_unlock(&exp->exp_locks_list_guard);
1358 }
1359 EXPORT_SYMBOL(__class_export_del_lock_ref);
1360 #endif
1361
1362 /* A connection defines an export context in which preallocation can
1363    be managed. This releases the export pointer reference, and returns
1364    the export handle, so the export refcount is 1 when this function
1365    returns. */
1366 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1367                   struct obd_uuid *cluuid)
1368 {
1369         struct obd_export *export;
1370         LASSERT(conn != NULL);
1371         LASSERT(obd != NULL);
1372         LASSERT(cluuid != NULL);
1373         ENTRY;
1374
1375         export = class_new_export(obd, cluuid);
1376         if (IS_ERR(export))
1377                 RETURN(PTR_ERR(export));
1378
1379         conn->cookie = export->exp_handle.h_cookie;
1380         class_export_put(export);
1381
1382         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1383                cluuid->uuid, conn->cookie);
1384         RETURN(0);
1385 }
1386 EXPORT_SYMBOL(class_connect);
1387
1388 /* if export is involved in recovery then clean up related things */
1389 static void class_export_recovery_cleanup(struct obd_export *exp)
1390 {
1391         struct obd_device *obd = exp->exp_obd;
1392
1393         spin_lock(&obd->obd_recovery_task_lock);
1394         if (obd->obd_recovering) {
1395                 if (exp->exp_in_recovery) {
1396                         spin_lock(&exp->exp_lock);
1397                         exp->exp_in_recovery = 0;
1398                         spin_unlock(&exp->exp_lock);
1399                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1400                         atomic_dec(&obd->obd_connected_clients);
1401                 }
1402
1403                 /* if called during recovery then should update
1404                  * obd_stale_clients counter,
1405                  * lightweight exports are not counted */
1406                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1407                         exp->exp_obd->obd_stale_clients++;
1408         }
1409         spin_unlock(&obd->obd_recovery_task_lock);
1410
1411         spin_lock(&exp->exp_lock);
1412         /** Cleanup req replay fields */
1413         if (exp->exp_req_replay_needed) {
1414                 exp->exp_req_replay_needed = 0;
1415
1416                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1417                 atomic_dec(&obd->obd_req_replay_clients);
1418         }
1419
1420         /** Cleanup lock replay data */
1421         if (exp->exp_lock_replay_needed) {
1422                 exp->exp_lock_replay_needed = 0;
1423
1424                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1425                 atomic_dec(&obd->obd_lock_replay_clients);
1426         }
1427         spin_unlock(&exp->exp_lock);
1428 }
1429
1430 /* This function removes 1-3 references from the export:
1431  * 1 - for export pointer passed
1432  * and if disconnect really need
1433  * 2 - removing from hash
1434  * 3 - in client_unlink_export
1435  * The export pointer passed to this function can destroyed */
1436 int class_disconnect(struct obd_export *export)
1437 {
1438         int already_disconnected;
1439         ENTRY;
1440
1441         if (export == NULL) {
1442                 CWARN("attempting to free NULL export %p\n", export);
1443                 RETURN(-EINVAL);
1444         }
1445
1446         spin_lock(&export->exp_lock);
1447         already_disconnected = export->exp_disconnected;
1448         export->exp_disconnected = 1;
1449         /*  We hold references of export for uuid hash
1450          *  and nid_hash and export link at least. So
1451          *  it is safe to call cfs_hash_del in there.  */
1452         if (!hlist_unhashed(&export->exp_nid_hash))
1453                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1454                              &export->exp_connection->c_peer.nid,
1455                              &export->exp_nid_hash);
1456         spin_unlock(&export->exp_lock);
1457
1458         /* class_cleanup(), abort_recovery(), and class_fail_export()
1459          * all end up in here, and if any of them race we shouldn't
1460          * call extra class_export_puts(). */
1461         if (already_disconnected) {
1462                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1463                 GOTO(no_disconn, already_disconnected);
1464         }
1465
1466         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1467                export->exp_handle.h_cookie);
1468
1469         class_export_recovery_cleanup(export);
1470         class_unlink_export(export);
1471 no_disconn:
1472         class_export_put(export);
1473         RETURN(0);
1474 }
1475 EXPORT_SYMBOL(class_disconnect);
1476
1477 /* Return non-zero for a fully connected export */
1478 int class_connected_export(struct obd_export *exp)
1479 {
1480         int connected = 0;
1481
1482         if (exp) {
1483                 spin_lock(&exp->exp_lock);
1484                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1485                 spin_unlock(&exp->exp_lock);
1486         }
1487         return connected;
1488 }
1489 EXPORT_SYMBOL(class_connected_export);
1490
1491 static void class_disconnect_export_list(struct list_head *list,
1492                                          enum obd_option flags)
1493 {
1494         int rc;
1495         struct obd_export *exp;
1496         ENTRY;
1497
1498         /* It's possible that an export may disconnect itself, but
1499          * nothing else will be added to this list. */
1500         while (!list_empty(list)) {
1501                 exp = list_entry(list->next, struct obd_export,
1502                                  exp_obd_chain);
1503                 /* need for safe call CDEBUG after obd_disconnect */
1504                 class_export_get(exp);
1505
1506                 spin_lock(&exp->exp_lock);
1507                 exp->exp_flags = flags;
1508                 spin_unlock(&exp->exp_lock);
1509
1510                 if (obd_uuid_equals(&exp->exp_client_uuid,
1511                                     &exp->exp_obd->obd_uuid)) {
1512                         CDEBUG(D_HA,
1513                                "exp %p export uuid == obd uuid, don't discon\n",
1514                                exp);
1515                         /* Need to delete this now so we don't end up pointing
1516                          * to work_list later when this export is cleaned up. */
1517                         list_del_init(&exp->exp_obd_chain);
1518                         class_export_put(exp);
1519                         continue;
1520                 }
1521
1522                 class_export_get(exp);
1523                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1524                        "last request at %lld\n",
1525                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1526                        exp, exp->exp_last_request_time);
1527                 /* release one export reference anyway */
1528                 rc = obd_disconnect(exp);
1529
1530                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1531                        obd_export_nid2str(exp), exp, rc);
1532                 class_export_put(exp);
1533         }
1534         EXIT;
1535 }
1536
1537 void class_disconnect_exports(struct obd_device *obd)
1538 {
1539         struct list_head work_list;
1540         ENTRY;
1541
1542         /* Move all of the exports from obd_exports to a work list, en masse. */
1543         INIT_LIST_HEAD(&work_list);
1544         spin_lock(&obd->obd_dev_lock);
1545         list_splice_init(&obd->obd_exports, &work_list);
1546         list_splice_init(&obd->obd_delayed_exports, &work_list);
1547         spin_unlock(&obd->obd_dev_lock);
1548
1549         if (!list_empty(&work_list)) {
1550                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1551                        "disconnecting them\n", obd->obd_minor, obd);
1552                 class_disconnect_export_list(&work_list,
1553                                              exp_flags_from_obd(obd));
1554         } else
1555                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1556                        obd->obd_minor, obd);
1557         EXIT;
1558 }
1559 EXPORT_SYMBOL(class_disconnect_exports);
1560
1561 /* Remove exports that have not completed recovery.
1562  */
1563 void class_disconnect_stale_exports(struct obd_device *obd,
1564                                     int (*test_export)(struct obd_export *))
1565 {
1566         struct list_head work_list;
1567         struct obd_export *exp, *n;
1568         int evicted = 0;
1569         ENTRY;
1570
1571         INIT_LIST_HEAD(&work_list);
1572         spin_lock(&obd->obd_dev_lock);
1573         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1574                                  exp_obd_chain) {
1575                 /* don't count self-export as client */
1576                 if (obd_uuid_equals(&exp->exp_client_uuid,
1577                                     &exp->exp_obd->obd_uuid))
1578                         continue;
1579
1580                 /* don't evict clients which have no slot in last_rcvd
1581                  * (e.g. lightweight connection) */
1582                 if (exp->exp_target_data.ted_lr_idx == -1)
1583                         continue;
1584
1585                 spin_lock(&exp->exp_lock);
1586                 if (exp->exp_failed || test_export(exp)) {
1587                         spin_unlock(&exp->exp_lock);
1588                         continue;
1589                 }
1590                 exp->exp_failed = 1;
1591                 spin_unlock(&exp->exp_lock);
1592
1593                 list_move(&exp->exp_obd_chain, &work_list);
1594                 evicted++;
1595                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1596                        obd->obd_name, exp->exp_client_uuid.uuid,
1597                        obd_export_nid2str(exp));
1598                 print_export_data(exp, "EVICTING", 0, D_HA);
1599         }
1600         spin_unlock(&obd->obd_dev_lock);
1601
1602         if (evicted)
1603                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1604                               obd->obd_name, evicted);
1605
1606         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1607                                                  OBD_OPT_ABORT_RECOV);
1608         EXIT;
1609 }
1610 EXPORT_SYMBOL(class_disconnect_stale_exports);
1611
1612 void class_fail_export(struct obd_export *exp)
1613 {
1614         int rc, already_failed;
1615
1616         spin_lock(&exp->exp_lock);
1617         already_failed = exp->exp_failed;
1618         exp->exp_failed = 1;
1619         spin_unlock(&exp->exp_lock);
1620
1621         if (already_failed) {
1622                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1623                        exp, exp->exp_client_uuid.uuid);
1624                 return;
1625         }
1626
1627         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1628                exp, exp->exp_client_uuid.uuid);
1629
1630         if (obd_dump_on_timeout)
1631                 libcfs_debug_dumplog();
1632
1633         /* need for safe call CDEBUG after obd_disconnect */
1634         class_export_get(exp);
1635
1636         /* Most callers into obd_disconnect are removing their own reference
1637          * (request, for example) in addition to the one from the hash table.
1638          * We don't have such a reference here, so make one. */
1639         class_export_get(exp);
1640         rc = obd_disconnect(exp);
1641         if (rc)
1642                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1643         else
1644                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1645                        exp, exp->exp_client_uuid.uuid);
1646         class_export_put(exp);
1647 }
1648 EXPORT_SYMBOL(class_fail_export);
1649
1650 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1651 {
1652         struct cfs_hash *nid_hash;
1653         struct obd_export *doomed_exp = NULL;
1654         int exports_evicted = 0;
1655
1656         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1657
1658         spin_lock(&obd->obd_dev_lock);
1659         /* umount has run already, so evict thread should leave
1660          * its task to umount thread now */
1661         if (obd->obd_stopping) {
1662                 spin_unlock(&obd->obd_dev_lock);
1663                 return exports_evicted;
1664         }
1665         nid_hash = obd->obd_nid_hash;
1666         cfs_hash_getref(nid_hash);
1667         spin_unlock(&obd->obd_dev_lock);
1668
1669         do {
1670                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1671                 if (doomed_exp == NULL)
1672                         break;
1673
1674                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1675                          "nid %s found, wanted nid %s, requested nid %s\n",
1676                          obd_export_nid2str(doomed_exp),
1677                          libcfs_nid2str(nid_key), nid);
1678                 LASSERTF(doomed_exp != obd->obd_self_export,
1679                          "self-export is hashed by NID?\n");
1680                 exports_evicted++;
1681                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1682                               "request\n", obd->obd_name,
1683                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1684                               obd_export_nid2str(doomed_exp));
1685                 class_fail_export(doomed_exp);
1686                 class_export_put(doomed_exp);
1687         } while (1);
1688
1689         cfs_hash_putref(nid_hash);
1690
1691         if (!exports_evicted)
1692                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1693                        obd->obd_name, nid);
1694         return exports_evicted;
1695 }
1696 EXPORT_SYMBOL(obd_export_evict_by_nid);
1697
1698 #ifdef HAVE_SERVER_SUPPORT
1699 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1700 {
1701         struct obd_export *doomed_exp = NULL;
1702         struct obd_uuid doomed_uuid;
1703         int exports_evicted = 0;
1704
1705         spin_lock(&obd->obd_dev_lock);
1706         if (obd->obd_stopping) {
1707                 spin_unlock(&obd->obd_dev_lock);
1708                 return exports_evicted;
1709         }
1710         spin_unlock(&obd->obd_dev_lock);
1711
1712         obd_str2uuid(&doomed_uuid, uuid);
1713         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1714                 CERROR("%s: can't evict myself\n", obd->obd_name);
1715                 return exports_evicted;
1716         }
1717
1718         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1719         if (doomed_exp == NULL) {
1720                 CERROR("%s: can't disconnect %s: no exports found\n",
1721                        obd->obd_name, uuid);
1722         } else {
1723                 CWARN("%s: evicting %s at adminstrative request\n",
1724                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1725                 class_fail_export(doomed_exp);
1726                 class_export_put(doomed_exp);
1727                 obd_uuid_del(obd, doomed_exp);
1728                 exports_evicted++;
1729         }
1730
1731         return exports_evicted;
1732 }
1733 #endif /* HAVE_SERVER_SUPPORT */
1734
1735 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1736 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1737 EXPORT_SYMBOL(class_export_dump_hook);
1738 #endif
1739
1740 static void print_export_data(struct obd_export *exp, const char *status,
1741                               int locks, int debug_level)
1742 {
1743         struct ptlrpc_reply_state *rs;
1744         struct ptlrpc_reply_state *first_reply = NULL;
1745         int nreplies = 0;
1746
1747         spin_lock(&exp->exp_lock);
1748         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1749                             rs_exp_list) {
1750                 if (nreplies == 0)
1751                         first_reply = rs;
1752                 nreplies++;
1753         }
1754         spin_unlock(&exp->exp_lock);
1755
1756         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1757                "%p %s %llu stale:%d\n",
1758                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1759                obd_export_nid2str(exp),
1760                refcount_read(&exp->exp_handle.h_ref),
1761                atomic_read(&exp->exp_rpc_count),
1762                atomic_read(&exp->exp_cb_count),
1763                atomic_read(&exp->exp_locks_count),
1764                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1765                nreplies, first_reply, nreplies > 3 ? "..." : "",
1766                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1767 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1768         if (locks && class_export_dump_hook != NULL)
1769                 class_export_dump_hook(exp);
1770 #endif
1771 }
1772
1773 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1774 {
1775         struct obd_export *exp;
1776
1777         spin_lock(&obd->obd_dev_lock);
1778         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1779                 print_export_data(exp, "ACTIVE", locks, debug_level);
1780         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1781                 print_export_data(exp, "UNLINKED", locks, debug_level);
1782         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1783                 print_export_data(exp, "DELAYED", locks, debug_level);
1784         spin_unlock(&obd->obd_dev_lock);
1785 }
1786
1787 void obd_exports_barrier(struct obd_device *obd)
1788 {
1789         int waited = 2;
1790         LASSERT(list_empty(&obd->obd_exports));
1791         spin_lock(&obd->obd_dev_lock);
1792         while (!list_empty(&obd->obd_unlinked_exports)) {
1793                 spin_unlock(&obd->obd_dev_lock);
1794                 set_current_state(TASK_UNINTERRUPTIBLE);
1795                 schedule_timeout(cfs_time_seconds(waited));
1796                 if (waited > 5 && is_power_of_2(waited)) {
1797                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1798                                       "more than %d seconds. "
1799                                       "The obd refcount = %d. Is it stuck?\n",
1800                                       obd->obd_name, waited,
1801                                       atomic_read(&obd->obd_refcount));
1802                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1803                 }
1804                 waited *= 2;
1805                 spin_lock(&obd->obd_dev_lock);
1806         }
1807         spin_unlock(&obd->obd_dev_lock);
1808 }
1809 EXPORT_SYMBOL(obd_exports_barrier);
1810
1811 /**
1812  * Add export to the obd_zombe thread and notify it.
1813  */
1814 static void obd_zombie_export_add(struct obd_export *exp) {
1815         atomic_dec(&obd_stale_export_num);
1816         spin_lock(&exp->exp_obd->obd_dev_lock);
1817         LASSERT(!list_empty(&exp->exp_obd_chain));
1818         list_del_init(&exp->exp_obd_chain);
1819         spin_unlock(&exp->exp_obd->obd_dev_lock);
1820
1821         queue_work(zombie_wq, &exp->exp_zombie_work);
1822 }
1823
1824 /**
1825  * Add import to the obd_zombe thread and notify it.
1826  */
1827 static void obd_zombie_import_add(struct obd_import *imp) {
1828         LASSERT(imp->imp_sec == NULL);
1829
1830         queue_work(zombie_wq, &imp->imp_zombie_work);
1831 }
1832
1833 /**
1834  * wait when obd_zombie import/export queues become empty
1835  */
1836 void obd_zombie_barrier(void)
1837 {
1838         flush_workqueue(zombie_wq);
1839 }
1840 EXPORT_SYMBOL(obd_zombie_barrier);
1841
1842
1843 struct obd_export *obd_stale_export_get(void)
1844 {
1845         struct obd_export *exp = NULL;
1846         ENTRY;
1847
1848         spin_lock(&obd_stale_export_lock);
1849         if (!list_empty(&obd_stale_exports)) {
1850                 exp = list_entry(obd_stale_exports.next,
1851                                  struct obd_export, exp_stale_list);
1852                 list_del_init(&exp->exp_stale_list);
1853         }
1854         spin_unlock(&obd_stale_export_lock);
1855
1856         if (exp) {
1857                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1858                        atomic_read(&obd_stale_export_num));
1859         }
1860         RETURN(exp);
1861 }
1862 EXPORT_SYMBOL(obd_stale_export_get);
1863
1864 void obd_stale_export_put(struct obd_export *exp)
1865 {
1866         ENTRY;
1867
1868         LASSERT(list_empty(&exp->exp_stale_list));
1869         if (exp->exp_lock_hash &&
1870             atomic_read(&exp->exp_lock_hash->hs_count)) {
1871                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1872                        atomic_read(&obd_stale_export_num));
1873
1874                 spin_lock_bh(&exp->exp_bl_list_lock);
1875                 spin_lock(&obd_stale_export_lock);
1876                 /* Add to the tail if there is no blocked locks,
1877                  * to the head otherwise. */
1878                 if (list_empty(&exp->exp_bl_list))
1879                         list_add_tail(&exp->exp_stale_list,
1880                                       &obd_stale_exports);
1881                 else
1882                         list_add(&exp->exp_stale_list,
1883                                  &obd_stale_exports);
1884
1885                 spin_unlock(&obd_stale_export_lock);
1886                 spin_unlock_bh(&exp->exp_bl_list_lock);
1887         } else {
1888                 class_export_put(exp);
1889         }
1890         EXIT;
1891 }
1892 EXPORT_SYMBOL(obd_stale_export_put);
1893
1894 /**
1895  * Adjust the position of the export in the stale list,
1896  * i.e. move to the head of the list if is needed.
1897  **/
1898 void obd_stale_export_adjust(struct obd_export *exp)
1899 {
1900         LASSERT(exp != NULL);
1901         spin_lock_bh(&exp->exp_bl_list_lock);
1902         spin_lock(&obd_stale_export_lock);
1903
1904         if (!list_empty(&exp->exp_stale_list) &&
1905             !list_empty(&exp->exp_bl_list))
1906                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1907
1908         spin_unlock(&obd_stale_export_lock);
1909         spin_unlock_bh(&exp->exp_bl_list_lock);
1910 }
1911 EXPORT_SYMBOL(obd_stale_export_adjust);
1912
1913 /**
1914  * start destroy zombie import/export thread
1915  */
1916 int obd_zombie_impexp_init(void)
1917 {
1918         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1919         if (!zombie_wq)
1920                 return -ENOMEM;
1921
1922         return 0;
1923 }
1924
1925 /**
1926  * stop destroy zombie import/export thread
1927  */
1928 void obd_zombie_impexp_stop(void)
1929 {
1930         destroy_workqueue(zombie_wq);
1931         LASSERT(list_empty(&obd_stale_exports));
1932 }
1933
1934 /***** Kernel-userspace comm helpers *******/
1935
1936 /* Get length of entire message, including header */
1937 int kuc_len(int payload_len)
1938 {
1939         return sizeof(struct kuc_hdr) + payload_len;
1940 }
1941 EXPORT_SYMBOL(kuc_len);
1942
1943 /* Get a pointer to kuc header, given a ptr to the payload
1944  * @param p Pointer to payload area
1945  * @returns Pointer to kuc header
1946  */
1947 struct kuc_hdr * kuc_ptr(void *p)
1948 {
1949         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1950         LASSERT(lh->kuc_magic == KUC_MAGIC);
1951         return lh;
1952 }
1953 EXPORT_SYMBOL(kuc_ptr);
1954
1955 /* Alloc space for a message, and fill in header
1956  * @return Pointer to payload area
1957  */
1958 void *kuc_alloc(int payload_len, int transport, int type)
1959 {
1960         struct kuc_hdr *lh;
1961         int len = kuc_len(payload_len);
1962
1963         OBD_ALLOC(lh, len);
1964         if (lh == NULL)
1965                 return ERR_PTR(-ENOMEM);
1966
1967         lh->kuc_magic = KUC_MAGIC;
1968         lh->kuc_transport = transport;
1969         lh->kuc_msgtype = type;
1970         lh->kuc_msglen = len;
1971
1972         return (void *)(lh + 1);
1973 }
1974 EXPORT_SYMBOL(kuc_alloc);
1975
1976 /* Takes pointer to payload area */
1977 void kuc_free(void *p, int payload_len)
1978 {
1979         struct kuc_hdr *lh = kuc_ptr(p);
1980         OBD_FREE(lh, kuc_len(payload_len));
1981 }
1982 EXPORT_SYMBOL(kuc_free);
1983
1984 struct obd_request_slot_waiter {
1985         struct list_head        orsw_entry;
1986         wait_queue_head_t       orsw_waitq;
1987         bool                    orsw_signaled;
1988 };
1989
1990 static bool obd_request_slot_avail(struct client_obd *cli,
1991                                    struct obd_request_slot_waiter *orsw)
1992 {
1993         bool avail;
1994
1995         spin_lock(&cli->cl_loi_list_lock);
1996         avail = !!list_empty(&orsw->orsw_entry);
1997         spin_unlock(&cli->cl_loi_list_lock);
1998
1999         return avail;
2000 };
2001
2002 /*
2003  * For network flow control, the RPC sponsor needs to acquire a credit
2004  * before sending the RPC. The credits count for a connection is defined
2005  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2006  * the subsequent RPC sponsors need to wait until others released their
2007  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2008  */
2009 int obd_get_request_slot(struct client_obd *cli)
2010 {
2011         struct obd_request_slot_waiter   orsw;
2012         struct l_wait_info               lwi;
2013         int                              rc;
2014
2015         spin_lock(&cli->cl_loi_list_lock);
2016         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2017                 cli->cl_rpcs_in_flight++;
2018                 spin_unlock(&cli->cl_loi_list_lock);
2019                 return 0;
2020         }
2021
2022         init_waitqueue_head(&orsw.orsw_waitq);
2023         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2024         orsw.orsw_signaled = false;
2025         spin_unlock(&cli->cl_loi_list_lock);
2026
2027         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2028         rc = l_wait_event(orsw.orsw_waitq,
2029                           obd_request_slot_avail(cli, &orsw) ||
2030                           orsw.orsw_signaled,
2031                           &lwi);
2032
2033         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2034          * freed but other (such as obd_put_request_slot) is using it. */
2035         spin_lock(&cli->cl_loi_list_lock);
2036         if (rc != 0) {
2037                 if (!orsw.orsw_signaled) {
2038                         if (list_empty(&orsw.orsw_entry))
2039                                 cli->cl_rpcs_in_flight--;
2040                         else
2041                                 list_del(&orsw.orsw_entry);
2042                 }
2043         }
2044
2045         if (orsw.orsw_signaled) {
2046                 LASSERT(list_empty(&orsw.orsw_entry));
2047
2048                 rc = -EINTR;
2049         }
2050         spin_unlock(&cli->cl_loi_list_lock);
2051
2052         return rc;
2053 }
2054 EXPORT_SYMBOL(obd_get_request_slot);
2055
2056 void obd_put_request_slot(struct client_obd *cli)
2057 {
2058         struct obd_request_slot_waiter *orsw;
2059
2060         spin_lock(&cli->cl_loi_list_lock);
2061         cli->cl_rpcs_in_flight--;
2062
2063         /* If there is free slot, wakeup the first waiter. */
2064         if (!list_empty(&cli->cl_flight_waiters) &&
2065             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2066                 orsw = list_entry(cli->cl_flight_waiters.next,
2067                                   struct obd_request_slot_waiter, orsw_entry);
2068                 list_del_init(&orsw->orsw_entry);
2069                 cli->cl_rpcs_in_flight++;
2070                 wake_up(&orsw->orsw_waitq);
2071         }
2072         spin_unlock(&cli->cl_loi_list_lock);
2073 }
2074 EXPORT_SYMBOL(obd_put_request_slot);
2075
2076 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2077 {
2078         return cli->cl_max_rpcs_in_flight;
2079 }
2080 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2081
2082 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2083 {
2084         struct obd_request_slot_waiter *orsw;
2085         __u32                           old;
2086         int                             diff;
2087         int                             i;
2088         const char *type_name;
2089         int                             rc;
2090
2091         if (max > OBD_MAX_RIF_MAX || max < 1)
2092                 return -ERANGE;
2093
2094         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2095         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2096                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2097                  * strictly lower that max_rpcs_in_flight */
2098                 if (max < 2) {
2099                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2100                                "because it must be higher than "
2101                                "max_mod_rpcs_in_flight value",
2102                                cli->cl_import->imp_obd->obd_name);
2103                         return -ERANGE;
2104                 }
2105                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2106                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2107                         if (rc != 0)
2108                                 return rc;
2109                 }
2110         }
2111
2112         spin_lock(&cli->cl_loi_list_lock);
2113         old = cli->cl_max_rpcs_in_flight;
2114         cli->cl_max_rpcs_in_flight = max;
2115         client_adjust_max_dirty(cli);
2116
2117         diff = max - old;
2118
2119         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2120         for (i = 0; i < diff; i++) {
2121                 if (list_empty(&cli->cl_flight_waiters))
2122                         break;
2123
2124                 orsw = list_entry(cli->cl_flight_waiters.next,
2125                                   struct obd_request_slot_waiter, orsw_entry);
2126                 list_del_init(&orsw->orsw_entry);
2127                 cli->cl_rpcs_in_flight++;
2128                 wake_up(&orsw->orsw_waitq);
2129         }
2130         spin_unlock(&cli->cl_loi_list_lock);
2131
2132         return 0;
2133 }
2134 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2135
2136 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2137 {
2138         return cli->cl_max_mod_rpcs_in_flight;
2139 }
2140 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2141
2142 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2143 {
2144         struct obd_connect_data *ocd;
2145         __u16 maxmodrpcs;
2146         __u16 prev;
2147
2148         if (max > OBD_MAX_RIF_MAX || max < 1)
2149                 return -ERANGE;
2150
2151         /* cannot exceed or equal max_rpcs_in_flight */
2152         if (max >= cli->cl_max_rpcs_in_flight) {
2153                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2154                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2155                        cli->cl_import->imp_obd->obd_name,
2156                        max, cli->cl_max_rpcs_in_flight);
2157                 return -ERANGE;
2158         }
2159
2160         /* cannot exceed max modify RPCs in flight supported by the server */
2161         ocd = &cli->cl_import->imp_connect_data;
2162         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2163                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2164         else
2165                 maxmodrpcs = 1;
2166         if (max > maxmodrpcs) {
2167                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2168                        "higher than max_mod_rpcs_per_client value (%hu) "
2169                        "returned by the server at connection\n",
2170                        cli->cl_import->imp_obd->obd_name,
2171                        max, maxmodrpcs);
2172                 return -ERANGE;
2173         }
2174
2175         spin_lock(&cli->cl_mod_rpcs_lock);
2176
2177         prev = cli->cl_max_mod_rpcs_in_flight;
2178         cli->cl_max_mod_rpcs_in_flight = max;
2179
2180         /* wakeup waiters if limit has been increased */
2181         if (cli->cl_max_mod_rpcs_in_flight > prev)
2182                 wake_up(&cli->cl_mod_rpcs_waitq);
2183
2184         spin_unlock(&cli->cl_mod_rpcs_lock);
2185
2186         return 0;
2187 }
2188 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2189
2190 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2191                                struct seq_file *seq)
2192 {
2193         unsigned long mod_tot = 0, mod_cum;
2194         struct timespec64 now;
2195         int i;
2196
2197         ktime_get_real_ts64(&now);
2198
2199         spin_lock(&cli->cl_mod_rpcs_lock);
2200
2201         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2202                    (s64)now.tv_sec, now.tv_nsec);
2203         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2204                    cli->cl_mod_rpcs_in_flight);
2205
2206         seq_printf(seq, "\n\t\t\tmodify\n");
2207         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2208
2209         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2210
2211         mod_cum = 0;
2212         for (i = 0; i < OBD_HIST_MAX; i++) {
2213                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2214                 mod_cum += mod;
2215                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2216                            i, mod, pct(mod, mod_tot),
2217                            pct(mod_cum, mod_tot));
2218                 if (mod_cum == mod_tot)
2219                         break;
2220         }
2221
2222         spin_unlock(&cli->cl_mod_rpcs_lock);
2223
2224         return 0;
2225 }
2226 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2227
2228 /* The number of modify RPCs sent in parallel is limited
2229  * because the server has a finite number of slots per client to
2230  * store request result and ensure reply reconstruction when needed.
2231  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2232  * that takes into account server limit and cl_max_rpcs_in_flight
2233  * value.
2234  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2235  * one close request is allowed above the maximum.
2236  */
2237 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2238                                                  bool close_req)
2239 {
2240         bool avail;
2241
2242         /* A slot is available if
2243          * - number of modify RPCs in flight is less than the max
2244          * - it's a close RPC and no other close request is in flight
2245          */
2246         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2247                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2248
2249         return avail;
2250 }
2251
2252 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2253                                          bool close_req)
2254 {
2255         bool avail;
2256
2257         spin_lock(&cli->cl_mod_rpcs_lock);
2258         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2259         spin_unlock(&cli->cl_mod_rpcs_lock);
2260         return avail;
2261 }
2262
2263
2264 /* Get a modify RPC slot from the obd client @cli according
2265  * to the kind of operation @opc that is going to be sent
2266  * and the intent @it of the operation if it applies.
2267  * If the maximum number of modify RPCs in flight is reached
2268  * the thread is put to sleep.
2269  * Returns the tag to be set in the request message. Tag 0
2270  * is reserved for non-modifying requests.
2271  */
2272 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2273 {
2274         bool                    close_req = false;
2275         __u16                   i, max;
2276
2277         if (opc == MDS_CLOSE)
2278                 close_req = true;
2279
2280         do {
2281                 spin_lock(&cli->cl_mod_rpcs_lock);
2282                 max = cli->cl_max_mod_rpcs_in_flight;
2283                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2284                         /* there is a slot available */
2285                         cli->cl_mod_rpcs_in_flight++;
2286                         if (close_req)
2287                                 cli->cl_close_rpcs_in_flight++;
2288                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2289                                          cli->cl_mod_rpcs_in_flight);
2290                         /* find a free tag */
2291                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2292                                                 max + 1);
2293                         LASSERT(i < OBD_MAX_RIF_MAX);
2294                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2295                         spin_unlock(&cli->cl_mod_rpcs_lock);
2296                         /* tag 0 is reserved for non-modify RPCs */
2297
2298                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2299                                "opc %u, max %hu\n",
2300                                cli->cl_import->imp_obd->obd_name,
2301                                i + 1, opc, max);
2302
2303                         return i + 1;
2304                 }
2305                 spin_unlock(&cli->cl_mod_rpcs_lock);
2306
2307                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2308                        "opc %u, max %hu\n",
2309                        cli->cl_import->imp_obd->obd_name, opc, max);
2310
2311                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2312                                           obd_mod_rpc_slot_avail(cli,
2313                                                                  close_req));
2314         } while (true);
2315 }
2316 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2317
2318 /* Put a modify RPC slot from the obd client @cli according
2319  * to the kind of operation @opc that has been sent.
2320  */
2321 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2322 {
2323         bool                    close_req = false;
2324
2325         if (tag == 0)
2326                 return;
2327
2328         if (opc == MDS_CLOSE)
2329                 close_req = true;
2330
2331         spin_lock(&cli->cl_mod_rpcs_lock);
2332         cli->cl_mod_rpcs_in_flight--;
2333         if (close_req)
2334                 cli->cl_close_rpcs_in_flight--;
2335         /* release the tag in the bitmap */
2336         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2337         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2338         spin_unlock(&cli->cl_mod_rpcs_lock);
2339         wake_up(&cli->cl_mod_rpcs_waitq);
2340 }
2341 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2342