Whamcloud - gitweb
729cdd09fb57748165fac3b0d65a215bb9c4fcdb
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143                 /* class_search_type() returned a counted reference,
144                  * but we don't need that count any more as
145                  * we have one through typ_refcnt.
146                  */
147                 kobject_put(&type->typ_kobj);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         if (type->typ_md_ops)
176                 OBD_FREE_PTR(type->typ_md_ops);
177         if (type->typ_dt_ops)
178                 OBD_FREE_PTR(type->typ_dt_ops);
179
180         OBD_FREE(type, sizeof(*type));
181 }
182
183 static struct kobj_type class_ktype = {
184         .sysfs_ops      = &lustre_sysfs_ops,
185         .release        = class_sysfs_release,
186 };
187
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
190 {
191         struct dentry *symlink;
192         struct obd_type *type;
193         int rc;
194
195         type = class_search_type(name);
196         if (type) {
197                 kobject_put(&type->typ_kobj);
198                 return ERR_PTR(-EEXIST);
199         }
200
201         OBD_ALLOC(type, sizeof(*type));
202         if (!type)
203                 return ERR_PTR(-ENOMEM);
204
205         type->typ_kobj.kset = lustre_kset;
206         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207                                   &lustre_kset->kobj, "%s", name);
208         if (rc)
209                 return ERR_PTR(rc);
210
211         symlink = debugfs_create_dir(name, debugfs_lustre_root);
212         if (IS_ERR_OR_NULL(symlink)) {
213                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214                 kobject_put(&type->typ_kobj);
215                 return ERR_PTR(rc);
216         }
217         type->typ_debugfs_entry = symlink;
218         type->typ_sym_filter = true;
219
220         if (enable_proc) {
221                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
222                                                       NULL, NULL);
223                 if (IS_ERR(type->typ_procroot)) {
224                         CERROR("%s: can't create compat proc entry: %d\n",
225                                name, (int)PTR_ERR(type->typ_procroot));
226                         type->typ_procroot = NULL;
227                 }
228         }
229
230         return type;
231 }
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
234
235 #define CLASS_MAX_NAME 1024
236
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238                         bool enable_proc, struct lprocfs_vars *vars,
239                         const char *name, struct lu_device_type *ldt)
240 {
241         struct obd_type *type;
242         int rc;
243
244         ENTRY;
245         /* sanity check */
246         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
247
248         type = class_search_type(name);
249         if (type) {
250 #ifdef HAVE_SERVER_SUPPORT
251                 if (type->typ_sym_filter)
252                         goto dir_exist;
253 #endif /* HAVE_SERVER_SUPPORT */
254                 kobject_put(&type->typ_kobj);
255                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
256                 RETURN(-EEXIST);
257         }
258
259         OBD_ALLOC(type, sizeof(*type));
260         if (type == NULL)
261                 RETURN(-ENOMEM);
262
263         type->typ_kobj.kset = lustre_kset;
264         kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
266 dir_exist:
267 #endif /* HAVE_SERVER_SUPPORT */
268         OBD_ALLOC_PTR(type->typ_dt_ops);
269         OBD_ALLOC_PTR(type->typ_md_ops);
270
271         if (type->typ_dt_ops == NULL ||
272             type->typ_md_ops == NULL)
273                 GOTO (failed, rc = -ENOMEM);
274
275         *(type->typ_dt_ops) = *dt_ops;
276         /* md_ops is optional */
277         if (md_ops)
278                 *(type->typ_md_ops) = *md_ops;
279         spin_lock_init(&type->obd_type_lock);
280
281 #ifdef HAVE_SERVER_SUPPORT
282         if (type->typ_sym_filter) {
283                 type->typ_sym_filter = false;
284                 kobject_put(&type->typ_kobj);
285                 goto setup_ldt;
286         }
287 #endif
288 #ifdef CONFIG_PROC_FS
289         if (enable_proc && !type->typ_procroot) {
290                 type->typ_procroot = lprocfs_register(name,
291                                                       proc_lustre_root,
292                                                       NULL, type);
293                 if (IS_ERR(type->typ_procroot)) {
294                         rc = PTR_ERR(type->typ_procroot);
295                         type->typ_procroot = NULL;
296                         GOTO(failed, rc);
297                 }
298         }
299 #endif
300         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301                                                     vars, type);
302         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
303                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304                                              : -ENOMEM;
305                 type->typ_debugfs_entry = NULL;
306                 GOTO(failed, rc);
307         }
308
309         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
310         if (rc)
311                 GOTO(failed, rc);
312 #ifdef HAVE_SERVER_SUPPORT
313 setup_ldt:
314 #endif
315         if (ldt) {
316                 type->typ_lu = ldt;
317                 rc = lu_device_type_init(ldt);
318                 if (rc)
319                         GOTO(failed, rc);
320         }
321
322         RETURN(0);
323
324 failed:
325         kobject_put(&type->typ_kobj);
326
327         RETURN(rc);
328 }
329 EXPORT_SYMBOL(class_register_type);
330
331 int class_unregister_type(const char *name)
332 {
333         struct obd_type *type = class_search_type(name);
334         int rc = 0;
335         ENTRY;
336
337         if (!type) {
338                 CERROR("unknown obd type\n");
339                 RETURN(-EINVAL);
340         }
341
342         if (type->typ_refcnt) {
343                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
344                 /* This is a bad situation, let's make the best of it */
345                 /* Remove ops, but leave the name for debugging */
346                 OBD_FREE_PTR(type->typ_dt_ops);
347                 OBD_FREE_PTR(type->typ_md_ops);
348                 GOTO(out_put, rc = -EBUSY);
349         }
350
351         /* Put the final ref */
352         kobject_put(&type->typ_kobj);
353 out_put:
354         /* Put the ref returned by class_search_type() */
355         kobject_put(&type->typ_kobj);
356
357         RETURN(rc);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
360
361 /**
362  * Create a new obd device.
363  *
364  * Allocate the new obd_device and initialize it.
365  *
366  * \param[in] type_name obd device type string.
367  * \param[in] name      obd device name.
368  * \param[in] uuid      obd device UUID
369  *
370  * \retval newdev         pointer to created obd_device
371  * \retval ERR_PTR(errno) on error
372  */
373 struct obd_device *class_newdev(const char *type_name, const char *name,
374                                 const char *uuid)
375 {
376         struct obd_device *newdev;
377         struct obd_type *type = NULL;
378         ENTRY;
379
380         if (strlen(name) >= MAX_OBD_NAME) {
381                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382                 RETURN(ERR_PTR(-EINVAL));
383         }
384
385         type = class_get_type(type_name);
386         if (type == NULL){
387                 CERROR("OBD: unknown type: %s\n", type_name);
388                 RETURN(ERR_PTR(-ENODEV));
389         }
390
391         newdev = obd_device_alloc();
392         if (newdev == NULL) {
393                 class_put_type(type);
394                 RETURN(ERR_PTR(-ENOMEM));
395         }
396         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398         newdev->obd_type = type;
399         newdev->obd_minor = -1;
400
401         rwlock_init(&newdev->obd_pool_lock);
402         newdev->obd_pool_limit = 0;
403         newdev->obd_pool_slv = 0;
404
405         INIT_LIST_HEAD(&newdev->obd_exports);
406         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408         INIT_LIST_HEAD(&newdev->obd_exports_timed);
409         INIT_LIST_HEAD(&newdev->obd_nid_stats);
410         spin_lock_init(&newdev->obd_nid_lock);
411         spin_lock_init(&newdev->obd_dev_lock);
412         mutex_init(&newdev->obd_dev_mutex);
413         spin_lock_init(&newdev->obd_osfs_lock);
414         /* newdev->obd_osfs_age must be set to a value in the distant
415          * past to guarantee a fresh statfs is fetched on mount. */
416         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
417
418         /* XXX belongs in setup not attach  */
419         init_rwsem(&newdev->obd_observer_link_sem);
420         /* recovery data */
421         spin_lock_init(&newdev->obd_recovery_task_lock);
422         init_waitqueue_head(&newdev->obd_next_transno_waitq);
423         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427         INIT_LIST_HEAD(&newdev->obd_evict_list);
428         INIT_LIST_HEAD(&newdev->obd_lwp_list);
429
430         llog_group_init(&newdev->obd_olg);
431         /* Detach drops this */
432         atomic_set(&newdev->obd_refcount, 1);
433         lu_ref_init(&newdev->obd_reference);
434         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
435
436         newdev->obd_conn_inprogress = 0;
437
438         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
439
440         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441                newdev->obd_name, newdev);
442
443         return newdev;
444 }
445
446 /**
447  * Free obd device.
448  *
449  * \param[in] obd obd_device to be freed
450  *
451  * \retval none
452  */
453 void class_free_dev(struct obd_device *obd)
454 {
455         struct obd_type *obd_type = obd->obd_type;
456
457         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460                  "obd %p != obd_devs[%d] %p\n",
461                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463                  "obd_refcount should be 0, not %d\n",
464                  atomic_read(&obd->obd_refcount));
465         LASSERT(obd_type != NULL);
466
467         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468                obd->obd_name, obd->obd_type->typ_name);
469
470         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471                          obd->obd_name, obd->obd_uuid.uuid);
472         if (obd->obd_stopping) {
473                 int err;
474
475                 /* If we're not stopping, we were never set up */
476                 err = obd_cleanup(obd);
477                 if (err)
478                         CERROR("Cleanup %s returned %d\n",
479                                 obd->obd_name, err);
480         }
481
482         obd_device_free(obd);
483
484         class_put_type(obd_type);
485 }
486
487 /**
488  * Unregister obd device.
489  *
490  * Free slot in obd_dev[] used by \a obd.
491  *
492  * \param[in] new_obd obd_device to be unregistered
493  *
494  * \retval none
495  */
496 void class_unregister_device(struct obd_device *obd)
497 {
498         write_lock(&obd_dev_lock);
499         if (obd->obd_minor >= 0) {
500                 LASSERT(obd_devs[obd->obd_minor] == obd);
501                 obd_devs[obd->obd_minor] = NULL;
502                 obd->obd_minor = -1;
503         }
504         write_unlock(&obd_dev_lock);
505 }
506
507 /**
508  * Register obd device.
509  *
510  * Find free slot in obd_devs[], fills it with \a new_obd.
511  *
512  * \param[in] new_obd obd_device to be registered
513  *
514  * \retval 0          success
515  * \retval -EEXIST    device with this name is registered
516  * \retval -EOVERFLOW obd_devs[] is full
517  */
518 int class_register_device(struct obd_device *new_obd)
519 {
520         int ret = 0;
521         int i;
522         int new_obd_minor = 0;
523         bool minor_assign = false;
524         bool retried = false;
525
526 again:
527         write_lock(&obd_dev_lock);
528         for (i = 0; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530
531                 if (obd != NULL &&
532                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
533
534                         if (!retried) {
535                                 write_unlock(&obd_dev_lock);
536
537                                 /* the obd_device could be waited to be
538                                  * destroyed by the "obd_zombie_impexp_thread".
539                                  */
540                                 obd_zombie_barrier();
541                                 retried = true;
542                                 goto again;
543                         }
544
545                         CERROR("%s: already exists, won't add\n",
546                                obd->obd_name);
547                         /* in case we found a free slot before duplicate */
548                         minor_assign = false;
549                         ret = -EEXIST;
550                         break;
551                 }
552                 if (!minor_assign && obd == NULL) {
553                         new_obd_minor = i;
554                         minor_assign = true;
555                 }
556         }
557
558         if (minor_assign) {
559                 new_obd->obd_minor = new_obd_minor;
560                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562                 obd_devs[new_obd_minor] = new_obd;
563         } else {
564                 if (ret == 0) {
565                         ret = -EOVERFLOW;
566                         CERROR("%s: all %u/%u devices used, increase "
567                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568                                i, class_devno_max(), ret);
569                 }
570         }
571         write_unlock(&obd_dev_lock);
572
573         RETURN(ret);
574 }
575
576 static int class_name2dev_nolock(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         for (i = 0; i < class_devno_max(); i++) {
584                 struct obd_device *obd = class_num2obd(i);
585
586                 if (obd && strcmp(name, obd->obd_name) == 0) {
587                         /* Make sure we finished attaching before we give
588                            out any references */
589                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590                         if (obd->obd_attached) {
591                                 return i;
592                         }
593                         break;
594                 }
595         }
596
597         return -1;
598 }
599
600 int class_name2dev(const char *name)
601 {
602         int i;
603
604         if (!name)
605                 return -1;
606
607         read_lock(&obd_dev_lock);
608         i = class_name2dev_nolock(name);
609         read_unlock(&obd_dev_lock);
610
611         return i;
612 }
613 EXPORT_SYMBOL(class_name2dev);
614
615 struct obd_device *class_name2obd(const char *name)
616 {
617         int dev = class_name2dev(name);
618
619         if (dev < 0 || dev > class_devno_max())
620                 return NULL;
621         return class_num2obd(dev);
622 }
623 EXPORT_SYMBOL(class_name2obd);
624
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
626 {
627         int i;
628
629         for (i = 0; i < class_devno_max(); i++) {
630                 struct obd_device *obd = class_num2obd(i);
631
632                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
634                         return i;
635                 }
636         }
637
638         return -1;
639 }
640
641 int class_uuid2dev(struct obd_uuid *uuid)
642 {
643         int i;
644
645         read_lock(&obd_dev_lock);
646         i = class_uuid2dev_nolock(uuid);
647         read_unlock(&obd_dev_lock);
648
649         return i;
650 }
651 EXPORT_SYMBOL(class_uuid2dev);
652
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
654 {
655         int dev = class_uuid2dev(uuid);
656         if (dev < 0)
657                 return NULL;
658         return class_num2obd(dev);
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 /**
663  * Get obd device from ::obd_devs[]
664  *
665  * \param num [in] array index
666  *
667  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668  *         otherwise return the obd device there.
669  */
670 struct obd_device *class_num2obd(int num)
671 {
672         struct obd_device *obd = NULL;
673
674         if (num < class_devno_max()) {
675                 obd = obd_devs[num];
676                 if (obd == NULL)
677                         return NULL;
678
679                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680                          "%p obd_magic %08x != %08x\n",
681                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682                 LASSERTF(obd->obd_minor == num,
683                          "%p obd_minor %0d != %0d\n",
684                          obd, obd->obd_minor, num);
685         }
686
687         return obd;
688 }
689
690 /**
691  * Find obd in obd_dev[] by name or uuid.
692  *
693  * Increment obd's refcount if found.
694  *
695  * \param[in] str obd name or uuid
696  *
697  * \retval NULL    if not found
698  * \retval target  pointer to found obd_device
699  */
700 struct obd_device *class_dev_by_str(const char *str)
701 {
702         struct obd_device *target = NULL;
703         struct obd_uuid tgtuuid;
704         int rc;
705
706         obd_str2uuid(&tgtuuid, str);
707
708         read_lock(&obd_dev_lock);
709         rc = class_uuid2dev_nolock(&tgtuuid);
710         if (rc < 0)
711                 rc = class_name2dev_nolock(str);
712
713         if (rc >= 0)
714                 target = class_num2obd(rc);
715
716         if (target != NULL)
717                 class_incref(target, "find", current);
718         read_unlock(&obd_dev_lock);
719
720         RETURN(target);
721 }
722 EXPORT_SYMBOL(class_dev_by_str);
723
724 /**
725  * Get obd devices count. Device in any
726  *    state are counted
727  * \retval obd device count
728  */
729 int get_devices_count(void)
730 {
731         int index, max_index = class_devno_max(), dev_count = 0;
732
733         read_lock(&obd_dev_lock);
734         for (index = 0; index <= max_index; index++) {
735                 struct obd_device *obd = class_num2obd(index);
736                 if (obd != NULL)
737                         dev_count++;
738         }
739         read_unlock(&obd_dev_lock);
740
741         return dev_count;
742 }
743 EXPORT_SYMBOL(get_devices_count);
744
745 void class_obd_list(void)
746 {
747         char *status;
748         int i;
749
750         read_lock(&obd_dev_lock);
751         for (i = 0; i < class_devno_max(); i++) {
752                 struct obd_device *obd = class_num2obd(i);
753
754                 if (obd == NULL)
755                         continue;
756                 if (obd->obd_stopping)
757                         status = "ST";
758                 else if (obd->obd_set_up)
759                         status = "UP";
760                 else if (obd->obd_attached)
761                         status = "AT";
762                 else
763                         status = "--";
764                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765                          i, status, obd->obd_type->typ_name,
766                          obd->obd_name, obd->obd_uuid.uuid,
767                          atomic_read(&obd->obd_refcount));
768         }
769         read_unlock(&obd_dev_lock);
770         return;
771 }
772
773 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
774    specified, then only the client with that uuid is returned,
775    otherwise any client connected to the tgt is returned. */
776 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
777                                           const char *type_name,
778                                           struct obd_uuid *grp_uuid)
779 {
780         int i;
781
782         read_lock(&obd_dev_lock);
783         for (i = 0; i < class_devno_max(); i++) {
784                 struct obd_device *obd = class_num2obd(i);
785
786                 if (obd == NULL)
787                         continue;
788                 if ((strncmp(obd->obd_type->typ_name, type_name,
789                              strlen(type_name)) == 0)) {
790                         if (obd_uuid_equals(tgt_uuid,
791                                             &obd->u.cli.cl_target_uuid) &&
792                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
793                                                          &obd->obd_uuid) : 1)) {
794                                 read_unlock(&obd_dev_lock);
795                                 return obd;
796                         }
797                 }
798         }
799         read_unlock(&obd_dev_lock);
800
801         return NULL;
802 }
803 EXPORT_SYMBOL(class_find_client_obd);
804
805 /* Iterate the obd_device list looking devices have grp_uuid. Start
806    searching at *next, and if a device is found, the next index to look
807    at is saved in *next. If next is NULL, then the first matching device
808    will always be returned. */
809 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
810 {
811         int i;
812
813         if (next == NULL)
814                 i = 0;
815         else if (*next >= 0 && *next < class_devno_max())
816                 i = *next;
817         else
818                 return NULL;
819
820         read_lock(&obd_dev_lock);
821         for (; i < class_devno_max(); i++) {
822                 struct obd_device *obd = class_num2obd(i);
823
824                 if (obd == NULL)
825                         continue;
826                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
827                         if (next != NULL)
828                                 *next = i+1;
829                         read_unlock(&obd_dev_lock);
830                         return obd;
831                 }
832         }
833         read_unlock(&obd_dev_lock);
834
835         return NULL;
836 }
837 EXPORT_SYMBOL(class_devices_in_group);
838
839 /**
840  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
841  * adjust sptlrpc settings accordingly.
842  */
843 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
844 {
845         struct obd_device  *obd;
846         const char         *type;
847         int                 i, rc = 0, rc2;
848
849         LASSERT(namelen > 0);
850
851         read_lock(&obd_dev_lock);
852         for (i = 0; i < class_devno_max(); i++) {
853                 obd = class_num2obd(i);
854
855                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
856                         continue;
857
858                 /* only notify mdc, osc, osp, lwp, mdt, ost
859                  * because only these have a -sptlrpc llog */
860                 type = obd->obd_type->typ_name;
861                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
862                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
863                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
864                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
865                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
866                     strcmp(type, LUSTRE_OST_NAME) != 0)
867                         continue;
868
869                 if (strncmp(obd->obd_name, fsname, namelen))
870                         continue;
871
872                 class_incref(obd, __FUNCTION__, obd);
873                 read_unlock(&obd_dev_lock);
874                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
875                                          sizeof(KEY_SPTLRPC_CONF),
876                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
877                 rc = rc ? rc : rc2;
878                 class_decref(obd, __FUNCTION__, obd);
879                 read_lock(&obd_dev_lock);
880         }
881         read_unlock(&obd_dev_lock);
882         return rc;
883 }
884 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
885
886 void obd_cleanup_caches(void)
887 {
888         ENTRY;
889         if (obd_device_cachep) {
890                 kmem_cache_destroy(obd_device_cachep);
891                 obd_device_cachep = NULL;
892         }
893
894         EXIT;
895 }
896
897 int obd_init_caches(void)
898 {
899         int rc;
900         ENTRY;
901
902         LASSERT(obd_device_cachep == NULL);
903         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
904                                 sizeof(struct obd_device),
905                                 0, 0, 0, sizeof(struct obd_device), NULL);
906         if (!obd_device_cachep)
907                 GOTO(out, rc = -ENOMEM);
908
909         RETURN(0);
910 out:
911         obd_cleanup_caches();
912         RETURN(rc);
913 }
914
915 static struct portals_handle_ops export_handle_ops;
916
917 /* map connection to client */
918 struct obd_export *class_conn2export(struct lustre_handle *conn)
919 {
920         struct obd_export *export;
921         ENTRY;
922
923         if (!conn) {
924                 CDEBUG(D_CACHE, "looking for null handle\n");
925                 RETURN(NULL);
926         }
927
928         if (conn->cookie == -1) {  /* this means assign a new connection */
929                 CDEBUG(D_CACHE, "want a new connection\n");
930                 RETURN(NULL);
931         }
932
933         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
934         export = class_handle2object(conn->cookie, &export_handle_ops);
935         RETURN(export);
936 }
937 EXPORT_SYMBOL(class_conn2export);
938
939 struct obd_device *class_exp2obd(struct obd_export *exp)
940 {
941         if (exp)
942                 return exp->exp_obd;
943         return NULL;
944 }
945 EXPORT_SYMBOL(class_exp2obd);
946
947 struct obd_import *class_exp2cliimp(struct obd_export *exp)
948 {
949         struct obd_device *obd = exp->exp_obd;
950         if (obd == NULL)
951                 return NULL;
952         return obd->u.cli.cl_import;
953 }
954 EXPORT_SYMBOL(class_exp2cliimp);
955
956 /* Export management functions */
957 static void class_export_destroy(struct obd_export *exp)
958 {
959         struct obd_device *obd = exp->exp_obd;
960         ENTRY;
961
962         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
963         LASSERT(obd != NULL);
964
965         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
966                exp->exp_client_uuid.uuid, obd->obd_name);
967
968         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
969         if (exp->exp_connection)
970                 ptlrpc_put_connection_superhack(exp->exp_connection);
971
972         LASSERT(list_empty(&exp->exp_outstanding_replies));
973         LASSERT(list_empty(&exp->exp_uncommitted_replies));
974         LASSERT(list_empty(&exp->exp_req_replay_queue));
975         LASSERT(list_empty(&exp->exp_hp_rpcs));
976         obd_destroy_export(exp);
977         /* self export doesn't hold a reference to an obd, although it
978          * exists until freeing of the obd */
979         if (exp != obd->obd_self_export)
980                 class_decref(obd, "export", exp);
981
982         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
983         EXIT;
984 }
985
986 static void export_handle_addref(void *export)
987 {
988         class_export_get(export);
989 }
990
991 static struct portals_handle_ops export_handle_ops = {
992         .hop_addref = export_handle_addref,
993         .hop_free   = NULL,
994 };
995
996 struct obd_export *class_export_get(struct obd_export *exp)
997 {
998         atomic_inc(&exp->exp_refcount);
999         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1000                atomic_read(&exp->exp_refcount));
1001         return exp;
1002 }
1003 EXPORT_SYMBOL(class_export_get);
1004
1005 void class_export_put(struct obd_export *exp)
1006 {
1007         LASSERT(exp != NULL);
1008         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1009         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1010                atomic_read(&exp->exp_refcount) - 1);
1011
1012         if (atomic_dec_and_test(&exp->exp_refcount)) {
1013                 struct obd_device *obd = exp->exp_obd;
1014
1015                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1016                        exp, exp->exp_client_uuid.uuid);
1017
1018                 /* release nid stat refererence */
1019                 lprocfs_exp_cleanup(exp);
1020
1021                 if (exp == obd->obd_self_export) {
1022                         /* self export should be destroyed without
1023                          * zombie thread as it doesn't hold a
1024                          * reference to obd and doesn't hold any
1025                          * resources */
1026                         class_export_destroy(exp);
1027                         /* self export is destroyed, no class
1028                          * references exist and it is safe to free
1029                          * obd */
1030                         class_free_dev(obd);
1031                 } else {
1032                         LASSERT(!list_empty(&exp->exp_obd_chain));
1033                         obd_zombie_export_add(exp);
1034                 }
1035
1036         }
1037 }
1038 EXPORT_SYMBOL(class_export_put);
1039
1040 static void obd_zombie_exp_cull(struct work_struct *ws)
1041 {
1042         struct obd_export *export;
1043
1044         export = container_of(ws, struct obd_export, exp_zombie_work);
1045         class_export_destroy(export);
1046 }
1047
1048 /* Creates a new export, adds it to the hash table, and returns a
1049  * pointer to it. The refcount is 2: one for the hash reference, and
1050  * one for the pointer returned by this function. */
1051 struct obd_export *__class_new_export(struct obd_device *obd,
1052                                       struct obd_uuid *cluuid, bool is_self)
1053 {
1054         struct obd_export *export;
1055         struct cfs_hash *hash = NULL;
1056         int rc = 0;
1057         ENTRY;
1058
1059         OBD_ALLOC_PTR(export);
1060         if (!export)
1061                 return ERR_PTR(-ENOMEM);
1062
1063         export->exp_conn_cnt = 0;
1064         export->exp_lock_hash = NULL;
1065         export->exp_flock_hash = NULL;
1066         /* 2 = class_handle_hash + last */
1067         atomic_set(&export->exp_refcount, 2);
1068         atomic_set(&export->exp_rpc_count, 0);
1069         atomic_set(&export->exp_cb_count, 0);
1070         atomic_set(&export->exp_locks_count, 0);
1071 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1072         INIT_LIST_HEAD(&export->exp_locks_list);
1073         spin_lock_init(&export->exp_locks_list_guard);
1074 #endif
1075         atomic_set(&export->exp_replay_count, 0);
1076         export->exp_obd = obd;
1077         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1078         spin_lock_init(&export->exp_uncommitted_replies_lock);
1079         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1080         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1081         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1082         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1083         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1084         class_handle_hash(&export->exp_handle, &export_handle_ops);
1085         export->exp_last_request_time = ktime_get_real_seconds();
1086         spin_lock_init(&export->exp_lock);
1087         spin_lock_init(&export->exp_rpc_lock);
1088         INIT_HLIST_NODE(&export->exp_uuid_hash);
1089         INIT_HLIST_NODE(&export->exp_nid_hash);
1090         INIT_HLIST_NODE(&export->exp_gen_hash);
1091         spin_lock_init(&export->exp_bl_list_lock);
1092         INIT_LIST_HEAD(&export->exp_bl_list);
1093         INIT_LIST_HEAD(&export->exp_stale_list);
1094         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1095
1096         export->exp_sp_peer = LUSTRE_SP_ANY;
1097         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1098         export->exp_client_uuid = *cluuid;
1099         obd_init_export(export);
1100
1101         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1102                 spin_lock(&obd->obd_dev_lock);
1103                 /* shouldn't happen, but might race */
1104                 if (obd->obd_stopping)
1105                         GOTO(exit_unlock, rc = -ENODEV);
1106
1107                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1108                 if (hash == NULL)
1109                         GOTO(exit_unlock, rc = -ENODEV);
1110                 spin_unlock(&obd->obd_dev_lock);
1111
1112                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1113                 if (rc != 0) {
1114                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1115                                       obd->obd_name, cluuid->uuid, rc);
1116                         GOTO(exit_err, rc = -EALREADY);
1117                 }
1118         }
1119
1120         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1121         spin_lock(&obd->obd_dev_lock);
1122         if (obd->obd_stopping) {
1123                 if (hash)
1124                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1125                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1126         }
1127
1128         if (!is_self) {
1129                 class_incref(obd, "export", export);
1130                 list_add_tail(&export->exp_obd_chain_timed,
1131                               &obd->obd_exports_timed);
1132                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1133                 obd->obd_num_exports++;
1134         } else {
1135                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1136                 INIT_LIST_HEAD(&export->exp_obd_chain);
1137         }
1138         spin_unlock(&obd->obd_dev_lock);
1139         if (hash)
1140                 cfs_hash_putref(hash);
1141         RETURN(export);
1142
1143 exit_unlock:
1144         spin_unlock(&obd->obd_dev_lock);
1145 exit_err:
1146         if (hash)
1147                 cfs_hash_putref(hash);
1148         class_handle_unhash(&export->exp_handle);
1149         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1150         obd_destroy_export(export);
1151         OBD_FREE_PTR(export);
1152         return ERR_PTR(rc);
1153 }
1154
1155 struct obd_export *class_new_export(struct obd_device *obd,
1156                                     struct obd_uuid *uuid)
1157 {
1158         return __class_new_export(obd, uuid, false);
1159 }
1160 EXPORT_SYMBOL(class_new_export);
1161
1162 struct obd_export *class_new_export_self(struct obd_device *obd,
1163                                          struct obd_uuid *uuid)
1164 {
1165         return __class_new_export(obd, uuid, true);
1166 }
1167
1168 void class_unlink_export(struct obd_export *exp)
1169 {
1170         class_handle_unhash(&exp->exp_handle);
1171
1172         if (exp->exp_obd->obd_self_export == exp) {
1173                 class_export_put(exp);
1174                 return;
1175         }
1176
1177         spin_lock(&exp->exp_obd->obd_dev_lock);
1178         /* delete an uuid-export hashitem from hashtables */
1179         if (!hlist_unhashed(&exp->exp_uuid_hash))
1180                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1181                              &exp->exp_client_uuid,
1182                              &exp->exp_uuid_hash);
1183
1184 #ifdef HAVE_SERVER_SUPPORT
1185         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1186                 struct tg_export_data   *ted = &exp->exp_target_data;
1187                 struct cfs_hash         *hash;
1188
1189                 /* Because obd_gen_hash will not be released until
1190                  * class_cleanup(), so hash should never be NULL here */
1191                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1192                 LASSERT(hash != NULL);
1193                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1194                              &exp->exp_gen_hash);
1195                 cfs_hash_putref(hash);
1196         }
1197 #endif /* HAVE_SERVER_SUPPORT */
1198
1199         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1200         list_del_init(&exp->exp_obd_chain_timed);
1201         exp->exp_obd->obd_num_exports--;
1202         spin_unlock(&exp->exp_obd->obd_dev_lock);
1203         atomic_inc(&obd_stale_export_num);
1204
1205         /* A reference is kept by obd_stale_exports list */
1206         obd_stale_export_put(exp);
1207 }
1208 EXPORT_SYMBOL(class_unlink_export);
1209
1210 /* Import management functions */
1211 static void obd_zombie_import_free(struct obd_import *imp)
1212 {
1213         ENTRY;
1214
1215         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1216                 imp->imp_obd->obd_name);
1217
1218         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1219
1220         ptlrpc_put_connection_superhack(imp->imp_connection);
1221
1222         while (!list_empty(&imp->imp_conn_list)) {
1223                 struct obd_import_conn *imp_conn;
1224
1225                 imp_conn = list_entry(imp->imp_conn_list.next,
1226                                       struct obd_import_conn, oic_item);
1227                 list_del_init(&imp_conn->oic_item);
1228                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1229                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1230         }
1231
1232         LASSERT(imp->imp_sec == NULL);
1233         class_decref(imp->imp_obd, "import", imp);
1234         OBD_FREE_PTR(imp);
1235         EXIT;
1236 }
1237
1238 struct obd_import *class_import_get(struct obd_import *import)
1239 {
1240         atomic_inc(&import->imp_refcount);
1241         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1242                atomic_read(&import->imp_refcount),
1243                import->imp_obd->obd_name);
1244         return import;
1245 }
1246 EXPORT_SYMBOL(class_import_get);
1247
1248 void class_import_put(struct obd_import *imp)
1249 {
1250         ENTRY;
1251
1252         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1253
1254         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1255                atomic_read(&imp->imp_refcount) - 1,
1256                imp->imp_obd->obd_name);
1257
1258         if (atomic_dec_and_test(&imp->imp_refcount)) {
1259                 CDEBUG(D_INFO, "final put import %p\n", imp);
1260                 obd_zombie_import_add(imp);
1261         }
1262
1263         /* catch possible import put race */
1264         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1265         EXIT;
1266 }
1267 EXPORT_SYMBOL(class_import_put);
1268
1269 static void init_imp_at(struct imp_at *at) {
1270         int i;
1271         at_init(&at->iat_net_latency, 0, 0);
1272         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1273                 /* max service estimates are tracked on the server side, so
1274                    don't use the AT history here, just use the last reported
1275                    val. (But keep hist for proc histogram, worst_ever) */
1276                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1277                         AT_FLG_NOHIST);
1278         }
1279 }
1280
1281 static void obd_zombie_imp_cull(struct work_struct *ws)
1282 {
1283         struct obd_import *import;
1284
1285         import = container_of(ws, struct obd_import, imp_zombie_work);
1286         obd_zombie_import_free(import);
1287 }
1288
1289 struct obd_import *class_new_import(struct obd_device *obd)
1290 {
1291         struct obd_import *imp;
1292         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1293
1294         OBD_ALLOC(imp, sizeof(*imp));
1295         if (imp == NULL)
1296                 return NULL;
1297
1298         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1299         INIT_LIST_HEAD(&imp->imp_replay_list);
1300         INIT_LIST_HEAD(&imp->imp_sending_list);
1301         INIT_LIST_HEAD(&imp->imp_delayed_list);
1302         INIT_LIST_HEAD(&imp->imp_committed_list);
1303         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1304         imp->imp_known_replied_xid = 0;
1305         imp->imp_replay_cursor = &imp->imp_committed_list;
1306         spin_lock_init(&imp->imp_lock);
1307         imp->imp_last_success_conn = 0;
1308         imp->imp_state = LUSTRE_IMP_NEW;
1309         imp->imp_obd = class_incref(obd, "import", imp);
1310         rwlock_init(&imp->imp_sec_lock);
1311         init_waitqueue_head(&imp->imp_recovery_waitq);
1312         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1313
1314         if (curr_pid_ns->child_reaper)
1315                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1316         else
1317                 imp->imp_sec_refpid = 1;
1318
1319         atomic_set(&imp->imp_refcount, 2);
1320         atomic_set(&imp->imp_unregistering, 0);
1321         atomic_set(&imp->imp_inflight, 0);
1322         atomic_set(&imp->imp_replay_inflight, 0);
1323         atomic_set(&imp->imp_inval_count, 0);
1324         INIT_LIST_HEAD(&imp->imp_conn_list);
1325         init_imp_at(&imp->imp_at);
1326
1327         /* the default magic is V2, will be used in connect RPC, and
1328          * then adjusted according to the flags in request/reply. */
1329         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1330
1331         return imp;
1332 }
1333 EXPORT_SYMBOL(class_new_import);
1334
1335 void class_destroy_import(struct obd_import *import)
1336 {
1337         LASSERT(import != NULL);
1338         LASSERT(import != LP_POISON);
1339
1340         spin_lock(&import->imp_lock);
1341         import->imp_generation++;
1342         spin_unlock(&import->imp_lock);
1343         class_import_put(import);
1344 }
1345 EXPORT_SYMBOL(class_destroy_import);
1346
1347 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1348
1349 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1350 {
1351         spin_lock(&exp->exp_locks_list_guard);
1352
1353         LASSERT(lock->l_exp_refs_nr >= 0);
1354
1355         if (lock->l_exp_refs_target != NULL &&
1356             lock->l_exp_refs_target != exp) {
1357                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1358                               exp, lock, lock->l_exp_refs_target);
1359         }
1360         if ((lock->l_exp_refs_nr ++) == 0) {
1361                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1362                 lock->l_exp_refs_target = exp;
1363         }
1364         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1365                lock, exp, lock->l_exp_refs_nr);
1366         spin_unlock(&exp->exp_locks_list_guard);
1367 }
1368 EXPORT_SYMBOL(__class_export_add_lock_ref);
1369
1370 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1371 {
1372         spin_lock(&exp->exp_locks_list_guard);
1373         LASSERT(lock->l_exp_refs_nr > 0);
1374         if (lock->l_exp_refs_target != exp) {
1375                 LCONSOLE_WARN("lock %p, "
1376                               "mismatching export pointers: %p, %p\n",
1377                               lock, lock->l_exp_refs_target, exp);
1378         }
1379         if (-- lock->l_exp_refs_nr == 0) {
1380                 list_del_init(&lock->l_exp_refs_link);
1381                 lock->l_exp_refs_target = NULL;
1382         }
1383         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1384                lock, exp, lock->l_exp_refs_nr);
1385         spin_unlock(&exp->exp_locks_list_guard);
1386 }
1387 EXPORT_SYMBOL(__class_export_del_lock_ref);
1388 #endif
1389
1390 /* A connection defines an export context in which preallocation can
1391    be managed. This releases the export pointer reference, and returns
1392    the export handle, so the export refcount is 1 when this function
1393    returns. */
1394 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1395                   struct obd_uuid *cluuid)
1396 {
1397         struct obd_export *export;
1398         LASSERT(conn != NULL);
1399         LASSERT(obd != NULL);
1400         LASSERT(cluuid != NULL);
1401         ENTRY;
1402
1403         export = class_new_export(obd, cluuid);
1404         if (IS_ERR(export))
1405                 RETURN(PTR_ERR(export));
1406
1407         conn->cookie = export->exp_handle.h_cookie;
1408         class_export_put(export);
1409
1410         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1411                cluuid->uuid, conn->cookie);
1412         RETURN(0);
1413 }
1414 EXPORT_SYMBOL(class_connect);
1415
1416 /* if export is involved in recovery then clean up related things */
1417 static void class_export_recovery_cleanup(struct obd_export *exp)
1418 {
1419         struct obd_device *obd = exp->exp_obd;
1420
1421         spin_lock(&obd->obd_recovery_task_lock);
1422         if (obd->obd_recovering) {
1423                 if (exp->exp_in_recovery) {
1424                         spin_lock(&exp->exp_lock);
1425                         exp->exp_in_recovery = 0;
1426                         spin_unlock(&exp->exp_lock);
1427                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1428                         atomic_dec(&obd->obd_connected_clients);
1429                 }
1430
1431                 /* if called during recovery then should update
1432                  * obd_stale_clients counter,
1433                  * lightweight exports are not counted */
1434                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1435                         exp->exp_obd->obd_stale_clients++;
1436         }
1437         spin_unlock(&obd->obd_recovery_task_lock);
1438
1439         spin_lock(&exp->exp_lock);
1440         /** Cleanup req replay fields */
1441         if (exp->exp_req_replay_needed) {
1442                 exp->exp_req_replay_needed = 0;
1443
1444                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1445                 atomic_dec(&obd->obd_req_replay_clients);
1446         }
1447
1448         /** Cleanup lock replay data */
1449         if (exp->exp_lock_replay_needed) {
1450                 exp->exp_lock_replay_needed = 0;
1451
1452                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1453                 atomic_dec(&obd->obd_lock_replay_clients);
1454         }
1455         spin_unlock(&exp->exp_lock);
1456 }
1457
1458 /* This function removes 1-3 references from the export:
1459  * 1 - for export pointer passed
1460  * and if disconnect really need
1461  * 2 - removing from hash
1462  * 3 - in client_unlink_export
1463  * The export pointer passed to this function can destroyed */
1464 int class_disconnect(struct obd_export *export)
1465 {
1466         int already_disconnected;
1467         ENTRY;
1468
1469         if (export == NULL) {
1470                 CWARN("attempting to free NULL export %p\n", export);
1471                 RETURN(-EINVAL);
1472         }
1473
1474         spin_lock(&export->exp_lock);
1475         already_disconnected = export->exp_disconnected;
1476         export->exp_disconnected = 1;
1477         /*  We hold references of export for uuid hash
1478          *  and nid_hash and export link at least. So
1479          *  it is safe to call cfs_hash_del in there.  */
1480         if (!hlist_unhashed(&export->exp_nid_hash))
1481                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1482                              &export->exp_connection->c_peer.nid,
1483                              &export->exp_nid_hash);
1484         spin_unlock(&export->exp_lock);
1485
1486         /* class_cleanup(), abort_recovery(), and class_fail_export()
1487          * all end up in here, and if any of them race we shouldn't
1488          * call extra class_export_puts(). */
1489         if (already_disconnected) {
1490                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1491                 GOTO(no_disconn, already_disconnected);
1492         }
1493
1494         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1495                export->exp_handle.h_cookie);
1496
1497         class_export_recovery_cleanup(export);
1498         class_unlink_export(export);
1499 no_disconn:
1500         class_export_put(export);
1501         RETURN(0);
1502 }
1503 EXPORT_SYMBOL(class_disconnect);
1504
1505 /* Return non-zero for a fully connected export */
1506 int class_connected_export(struct obd_export *exp)
1507 {
1508         int connected = 0;
1509
1510         if (exp) {
1511                 spin_lock(&exp->exp_lock);
1512                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1513                 spin_unlock(&exp->exp_lock);
1514         }
1515         return connected;
1516 }
1517 EXPORT_SYMBOL(class_connected_export);
1518
1519 static void class_disconnect_export_list(struct list_head *list,
1520                                          enum obd_option flags)
1521 {
1522         int rc;
1523         struct obd_export *exp;
1524         ENTRY;
1525
1526         /* It's possible that an export may disconnect itself, but
1527          * nothing else will be added to this list. */
1528         while (!list_empty(list)) {
1529                 exp = list_entry(list->next, struct obd_export,
1530                                  exp_obd_chain);
1531                 /* need for safe call CDEBUG after obd_disconnect */
1532                 class_export_get(exp);
1533
1534                 spin_lock(&exp->exp_lock);
1535                 exp->exp_flags = flags;
1536                 spin_unlock(&exp->exp_lock);
1537
1538                 if (obd_uuid_equals(&exp->exp_client_uuid,
1539                                     &exp->exp_obd->obd_uuid)) {
1540                         CDEBUG(D_HA,
1541                                "exp %p export uuid == obd uuid, don't discon\n",
1542                                exp);
1543                         /* Need to delete this now so we don't end up pointing
1544                          * to work_list later when this export is cleaned up. */
1545                         list_del_init(&exp->exp_obd_chain);
1546                         class_export_put(exp);
1547                         continue;
1548                 }
1549
1550                 class_export_get(exp);
1551                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1552                        "last request at %lld\n",
1553                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1554                        exp, exp->exp_last_request_time);
1555                 /* release one export reference anyway */
1556                 rc = obd_disconnect(exp);
1557
1558                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1559                        obd_export_nid2str(exp), exp, rc);
1560                 class_export_put(exp);
1561         }
1562         EXIT;
1563 }
1564
1565 void class_disconnect_exports(struct obd_device *obd)
1566 {
1567         struct list_head work_list;
1568         ENTRY;
1569
1570         /* Move all of the exports from obd_exports to a work list, en masse. */
1571         INIT_LIST_HEAD(&work_list);
1572         spin_lock(&obd->obd_dev_lock);
1573         list_splice_init(&obd->obd_exports, &work_list);
1574         list_splice_init(&obd->obd_delayed_exports, &work_list);
1575         spin_unlock(&obd->obd_dev_lock);
1576
1577         if (!list_empty(&work_list)) {
1578                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1579                        "disconnecting them\n", obd->obd_minor, obd);
1580                 class_disconnect_export_list(&work_list,
1581                                              exp_flags_from_obd(obd));
1582         } else
1583                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1584                        obd->obd_minor, obd);
1585         EXIT;
1586 }
1587 EXPORT_SYMBOL(class_disconnect_exports);
1588
1589 /* Remove exports that have not completed recovery.
1590  */
1591 void class_disconnect_stale_exports(struct obd_device *obd,
1592                                     int (*test_export)(struct obd_export *))
1593 {
1594         struct list_head work_list;
1595         struct obd_export *exp, *n;
1596         int evicted = 0;
1597         ENTRY;
1598
1599         INIT_LIST_HEAD(&work_list);
1600         spin_lock(&obd->obd_dev_lock);
1601         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1602                                  exp_obd_chain) {
1603                 /* don't count self-export as client */
1604                 if (obd_uuid_equals(&exp->exp_client_uuid,
1605                                     &exp->exp_obd->obd_uuid))
1606                         continue;
1607
1608                 /* don't evict clients which have no slot in last_rcvd
1609                  * (e.g. lightweight connection) */
1610                 if (exp->exp_target_data.ted_lr_idx == -1)
1611                         continue;
1612
1613                 spin_lock(&exp->exp_lock);
1614                 if (exp->exp_failed || test_export(exp)) {
1615                         spin_unlock(&exp->exp_lock);
1616                         continue;
1617                 }
1618                 exp->exp_failed = 1;
1619                 spin_unlock(&exp->exp_lock);
1620
1621                 list_move(&exp->exp_obd_chain, &work_list);
1622                 evicted++;
1623                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1624                        obd->obd_name, exp->exp_client_uuid.uuid,
1625                        obd_export_nid2str(exp));
1626                 print_export_data(exp, "EVICTING", 0, D_HA);
1627         }
1628         spin_unlock(&obd->obd_dev_lock);
1629
1630         if (evicted)
1631                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1632                               obd->obd_name, evicted);
1633
1634         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1635                                                  OBD_OPT_ABORT_RECOV);
1636         EXIT;
1637 }
1638 EXPORT_SYMBOL(class_disconnect_stale_exports);
1639
1640 void class_fail_export(struct obd_export *exp)
1641 {
1642         int rc, already_failed;
1643
1644         spin_lock(&exp->exp_lock);
1645         already_failed = exp->exp_failed;
1646         exp->exp_failed = 1;
1647         spin_unlock(&exp->exp_lock);
1648
1649         if (already_failed) {
1650                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1651                        exp, exp->exp_client_uuid.uuid);
1652                 return;
1653         }
1654
1655         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1656                exp, exp->exp_client_uuid.uuid);
1657
1658         if (obd_dump_on_timeout)
1659                 libcfs_debug_dumplog();
1660
1661         /* need for safe call CDEBUG after obd_disconnect */
1662         class_export_get(exp);
1663
1664         /* Most callers into obd_disconnect are removing their own reference
1665          * (request, for example) in addition to the one from the hash table.
1666          * We don't have such a reference here, so make one. */
1667         class_export_get(exp);
1668         rc = obd_disconnect(exp);
1669         if (rc)
1670                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1671         else
1672                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1673                        exp, exp->exp_client_uuid.uuid);
1674         class_export_put(exp);
1675 }
1676 EXPORT_SYMBOL(class_fail_export);
1677
1678 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1679 {
1680         struct cfs_hash *nid_hash;
1681         struct obd_export *doomed_exp = NULL;
1682         int exports_evicted = 0;
1683
1684         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1685
1686         spin_lock(&obd->obd_dev_lock);
1687         /* umount has run already, so evict thread should leave
1688          * its task to umount thread now */
1689         if (obd->obd_stopping) {
1690                 spin_unlock(&obd->obd_dev_lock);
1691                 return exports_evicted;
1692         }
1693         nid_hash = obd->obd_nid_hash;
1694         cfs_hash_getref(nid_hash);
1695         spin_unlock(&obd->obd_dev_lock);
1696
1697         do {
1698                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1699                 if (doomed_exp == NULL)
1700                         break;
1701
1702                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1703                          "nid %s found, wanted nid %s, requested nid %s\n",
1704                          obd_export_nid2str(doomed_exp),
1705                          libcfs_nid2str(nid_key), nid);
1706                 LASSERTF(doomed_exp != obd->obd_self_export,
1707                          "self-export is hashed by NID?\n");
1708                 exports_evicted++;
1709                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1710                               "request\n", obd->obd_name,
1711                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1712                               obd_export_nid2str(doomed_exp));
1713                 class_fail_export(doomed_exp);
1714                 class_export_put(doomed_exp);
1715         } while (1);
1716
1717         cfs_hash_putref(nid_hash);
1718
1719         if (!exports_evicted)
1720                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1721                        obd->obd_name, nid);
1722         return exports_evicted;
1723 }
1724 EXPORT_SYMBOL(obd_export_evict_by_nid);
1725
1726 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1727 {
1728         struct cfs_hash *uuid_hash;
1729         struct obd_export *doomed_exp = NULL;
1730         struct obd_uuid doomed_uuid;
1731         int exports_evicted = 0;
1732
1733         spin_lock(&obd->obd_dev_lock);
1734         if (obd->obd_stopping) {
1735                 spin_unlock(&obd->obd_dev_lock);
1736                 return exports_evicted;
1737         }
1738         uuid_hash = obd->obd_uuid_hash;
1739         cfs_hash_getref(uuid_hash);
1740         spin_unlock(&obd->obd_dev_lock);
1741
1742         obd_str2uuid(&doomed_uuid, uuid);
1743         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1744                 CERROR("%s: can't evict myself\n", obd->obd_name);
1745                 cfs_hash_putref(uuid_hash);
1746                 return exports_evicted;
1747         }
1748
1749         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1750
1751         if (doomed_exp == NULL) {
1752                 CERROR("%s: can't disconnect %s: no exports found\n",
1753                        obd->obd_name, uuid);
1754         } else {
1755                 CWARN("%s: evicting %s at adminstrative request\n",
1756                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1757                 class_fail_export(doomed_exp);
1758                 class_export_put(doomed_exp);
1759                 exports_evicted++;
1760         }
1761         cfs_hash_putref(uuid_hash);
1762
1763         return exports_evicted;
1764 }
1765
1766 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1767 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1768 EXPORT_SYMBOL(class_export_dump_hook);
1769 #endif
1770
1771 static void print_export_data(struct obd_export *exp, const char *status,
1772                               int locks, int debug_level)
1773 {
1774         struct ptlrpc_reply_state *rs;
1775         struct ptlrpc_reply_state *first_reply = NULL;
1776         int nreplies = 0;
1777
1778         spin_lock(&exp->exp_lock);
1779         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1780                             rs_exp_list) {
1781                 if (nreplies == 0)
1782                         first_reply = rs;
1783                 nreplies++;
1784         }
1785         spin_unlock(&exp->exp_lock);
1786
1787         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1788                "%p %s %llu stale:%d\n",
1789                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1790                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1791                atomic_read(&exp->exp_rpc_count),
1792                atomic_read(&exp->exp_cb_count),
1793                atomic_read(&exp->exp_locks_count),
1794                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1795                nreplies, first_reply, nreplies > 3 ? "..." : "",
1796                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1797 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1798         if (locks && class_export_dump_hook != NULL)
1799                 class_export_dump_hook(exp);
1800 #endif
1801 }
1802
1803 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1804 {
1805         struct obd_export *exp;
1806
1807         spin_lock(&obd->obd_dev_lock);
1808         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1809                 print_export_data(exp, "ACTIVE", locks, debug_level);
1810         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1811                 print_export_data(exp, "UNLINKED", locks, debug_level);
1812         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1813                 print_export_data(exp, "DELAYED", locks, debug_level);
1814         spin_unlock(&obd->obd_dev_lock);
1815 }
1816
1817 void obd_exports_barrier(struct obd_device *obd)
1818 {
1819         int waited = 2;
1820         LASSERT(list_empty(&obd->obd_exports));
1821         spin_lock(&obd->obd_dev_lock);
1822         while (!list_empty(&obd->obd_unlinked_exports)) {
1823                 spin_unlock(&obd->obd_dev_lock);
1824                 set_current_state(TASK_UNINTERRUPTIBLE);
1825                 schedule_timeout(cfs_time_seconds(waited));
1826                 if (waited > 5 && is_power_of_2(waited)) {
1827                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1828                                       "more than %d seconds. "
1829                                       "The obd refcount = %d. Is it stuck?\n",
1830                                       obd->obd_name, waited,
1831                                       atomic_read(&obd->obd_refcount));
1832                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1833                 }
1834                 waited *= 2;
1835                 spin_lock(&obd->obd_dev_lock);
1836         }
1837         spin_unlock(&obd->obd_dev_lock);
1838 }
1839 EXPORT_SYMBOL(obd_exports_barrier);
1840
1841 /**
1842  * Add export to the obd_zombe thread and notify it.
1843  */
1844 static void obd_zombie_export_add(struct obd_export *exp) {
1845         atomic_dec(&obd_stale_export_num);
1846         spin_lock(&exp->exp_obd->obd_dev_lock);
1847         LASSERT(!list_empty(&exp->exp_obd_chain));
1848         list_del_init(&exp->exp_obd_chain);
1849         spin_unlock(&exp->exp_obd->obd_dev_lock);
1850
1851         queue_work(zombie_wq, &exp->exp_zombie_work);
1852 }
1853
1854 /**
1855  * Add import to the obd_zombe thread and notify it.
1856  */
1857 static void obd_zombie_import_add(struct obd_import *imp) {
1858         LASSERT(imp->imp_sec == NULL);
1859
1860         queue_work(zombie_wq, &imp->imp_zombie_work);
1861 }
1862
1863 /**
1864  * wait when obd_zombie import/export queues become empty
1865  */
1866 void obd_zombie_barrier(void)
1867 {
1868         flush_workqueue(zombie_wq);
1869 }
1870 EXPORT_SYMBOL(obd_zombie_barrier);
1871
1872
1873 struct obd_export *obd_stale_export_get(void)
1874 {
1875         struct obd_export *exp = NULL;
1876         ENTRY;
1877
1878         spin_lock(&obd_stale_export_lock);
1879         if (!list_empty(&obd_stale_exports)) {
1880                 exp = list_entry(obd_stale_exports.next,
1881                                  struct obd_export, exp_stale_list);
1882                 list_del_init(&exp->exp_stale_list);
1883         }
1884         spin_unlock(&obd_stale_export_lock);
1885
1886         if (exp) {
1887                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1888                        atomic_read(&obd_stale_export_num));
1889         }
1890         RETURN(exp);
1891 }
1892 EXPORT_SYMBOL(obd_stale_export_get);
1893
1894 void obd_stale_export_put(struct obd_export *exp)
1895 {
1896         ENTRY;
1897
1898         LASSERT(list_empty(&exp->exp_stale_list));
1899         if (exp->exp_lock_hash &&
1900             atomic_read(&exp->exp_lock_hash->hs_count)) {
1901                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1902                        atomic_read(&obd_stale_export_num));
1903
1904                 spin_lock_bh(&exp->exp_bl_list_lock);
1905                 spin_lock(&obd_stale_export_lock);
1906                 /* Add to the tail if there is no blocked locks,
1907                  * to the head otherwise. */
1908                 if (list_empty(&exp->exp_bl_list))
1909                         list_add_tail(&exp->exp_stale_list,
1910                                       &obd_stale_exports);
1911                 else
1912                         list_add(&exp->exp_stale_list,
1913                                  &obd_stale_exports);
1914
1915                 spin_unlock(&obd_stale_export_lock);
1916                 spin_unlock_bh(&exp->exp_bl_list_lock);
1917         } else {
1918                 class_export_put(exp);
1919         }
1920         EXIT;
1921 }
1922 EXPORT_SYMBOL(obd_stale_export_put);
1923
1924 /**
1925  * Adjust the position of the export in the stale list,
1926  * i.e. move to the head of the list if is needed.
1927  **/
1928 void obd_stale_export_adjust(struct obd_export *exp)
1929 {
1930         LASSERT(exp != NULL);
1931         spin_lock_bh(&exp->exp_bl_list_lock);
1932         spin_lock(&obd_stale_export_lock);
1933
1934         if (!list_empty(&exp->exp_stale_list) &&
1935             !list_empty(&exp->exp_bl_list))
1936                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1937
1938         spin_unlock(&obd_stale_export_lock);
1939         spin_unlock_bh(&exp->exp_bl_list_lock);
1940 }
1941 EXPORT_SYMBOL(obd_stale_export_adjust);
1942
1943 /**
1944  * start destroy zombie import/export thread
1945  */
1946 int obd_zombie_impexp_init(void)
1947 {
1948         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1949         if (!zombie_wq)
1950                 return -ENOMEM;
1951
1952         return 0;
1953 }
1954
1955 /**
1956  * stop destroy zombie import/export thread
1957  */
1958 void obd_zombie_impexp_stop(void)
1959 {
1960         destroy_workqueue(zombie_wq);
1961         LASSERT(list_empty(&obd_stale_exports));
1962 }
1963
1964 /***** Kernel-userspace comm helpers *******/
1965
1966 /* Get length of entire message, including header */
1967 int kuc_len(int payload_len)
1968 {
1969         return sizeof(struct kuc_hdr) + payload_len;
1970 }
1971 EXPORT_SYMBOL(kuc_len);
1972
1973 /* Get a pointer to kuc header, given a ptr to the payload
1974  * @param p Pointer to payload area
1975  * @returns Pointer to kuc header
1976  */
1977 struct kuc_hdr * kuc_ptr(void *p)
1978 {
1979         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1980         LASSERT(lh->kuc_magic == KUC_MAGIC);
1981         return lh;
1982 }
1983 EXPORT_SYMBOL(kuc_ptr);
1984
1985 /* Alloc space for a message, and fill in header
1986  * @return Pointer to payload area
1987  */
1988 void *kuc_alloc(int payload_len, int transport, int type)
1989 {
1990         struct kuc_hdr *lh;
1991         int len = kuc_len(payload_len);
1992
1993         OBD_ALLOC(lh, len);
1994         if (lh == NULL)
1995                 return ERR_PTR(-ENOMEM);
1996
1997         lh->kuc_magic = KUC_MAGIC;
1998         lh->kuc_transport = transport;
1999         lh->kuc_msgtype = type;
2000         lh->kuc_msglen = len;
2001
2002         return (void *)(lh + 1);
2003 }
2004 EXPORT_SYMBOL(kuc_alloc);
2005
2006 /* Takes pointer to payload area */
2007 void kuc_free(void *p, int payload_len)
2008 {
2009         struct kuc_hdr *lh = kuc_ptr(p);
2010         OBD_FREE(lh, kuc_len(payload_len));
2011 }
2012 EXPORT_SYMBOL(kuc_free);
2013
2014 struct obd_request_slot_waiter {
2015         struct list_head        orsw_entry;
2016         wait_queue_head_t       orsw_waitq;
2017         bool                    orsw_signaled;
2018 };
2019
2020 static bool obd_request_slot_avail(struct client_obd *cli,
2021                                    struct obd_request_slot_waiter *orsw)
2022 {
2023         bool avail;
2024
2025         spin_lock(&cli->cl_loi_list_lock);
2026         avail = !!list_empty(&orsw->orsw_entry);
2027         spin_unlock(&cli->cl_loi_list_lock);
2028
2029         return avail;
2030 };
2031
2032 /*
2033  * For network flow control, the RPC sponsor needs to acquire a credit
2034  * before sending the RPC. The credits count for a connection is defined
2035  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2036  * the subsequent RPC sponsors need to wait until others released their
2037  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2038  */
2039 int obd_get_request_slot(struct client_obd *cli)
2040 {
2041         struct obd_request_slot_waiter   orsw;
2042         struct l_wait_info               lwi;
2043         int                              rc;
2044
2045         spin_lock(&cli->cl_loi_list_lock);
2046         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2047                 cli->cl_rpcs_in_flight++;
2048                 spin_unlock(&cli->cl_loi_list_lock);
2049                 return 0;
2050         }
2051
2052         init_waitqueue_head(&orsw.orsw_waitq);
2053         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2054         orsw.orsw_signaled = false;
2055         spin_unlock(&cli->cl_loi_list_lock);
2056
2057         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2058         rc = l_wait_event(orsw.orsw_waitq,
2059                           obd_request_slot_avail(cli, &orsw) ||
2060                           orsw.orsw_signaled,
2061                           &lwi);
2062
2063         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2064          * freed but other (such as obd_put_request_slot) is using it. */
2065         spin_lock(&cli->cl_loi_list_lock);
2066         if (rc != 0) {
2067                 if (!orsw.orsw_signaled) {
2068                         if (list_empty(&orsw.orsw_entry))
2069                                 cli->cl_rpcs_in_flight--;
2070                         else
2071                                 list_del(&orsw.orsw_entry);
2072                 }
2073         }
2074
2075         if (orsw.orsw_signaled) {
2076                 LASSERT(list_empty(&orsw.orsw_entry));
2077
2078                 rc = -EINTR;
2079         }
2080         spin_unlock(&cli->cl_loi_list_lock);
2081
2082         return rc;
2083 }
2084 EXPORT_SYMBOL(obd_get_request_slot);
2085
2086 void obd_put_request_slot(struct client_obd *cli)
2087 {
2088         struct obd_request_slot_waiter *orsw;
2089
2090         spin_lock(&cli->cl_loi_list_lock);
2091         cli->cl_rpcs_in_flight--;
2092
2093         /* If there is free slot, wakeup the first waiter. */
2094         if (!list_empty(&cli->cl_flight_waiters) &&
2095             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2096                 orsw = list_entry(cli->cl_flight_waiters.next,
2097                                   struct obd_request_slot_waiter, orsw_entry);
2098                 list_del_init(&orsw->orsw_entry);
2099                 cli->cl_rpcs_in_flight++;
2100                 wake_up(&orsw->orsw_waitq);
2101         }
2102         spin_unlock(&cli->cl_loi_list_lock);
2103 }
2104 EXPORT_SYMBOL(obd_put_request_slot);
2105
2106 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2107 {
2108         return cli->cl_max_rpcs_in_flight;
2109 }
2110 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2111
2112 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2113 {
2114         struct obd_request_slot_waiter *orsw;
2115         __u32                           old;
2116         int                             diff;
2117         int                             i;
2118         const char *type_name;
2119         int                             rc;
2120
2121         if (max > OBD_MAX_RIF_MAX || max < 1)
2122                 return -ERANGE;
2123
2124         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2125         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2126                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2127                  * strictly lower that max_rpcs_in_flight */
2128                 if (max < 2) {
2129                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2130                                "because it must be higher than "
2131                                "max_mod_rpcs_in_flight value",
2132                                cli->cl_import->imp_obd->obd_name);
2133                         return -ERANGE;
2134                 }
2135                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2136                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2137                         if (rc != 0)
2138                                 return rc;
2139                 }
2140         }
2141
2142         spin_lock(&cli->cl_loi_list_lock);
2143         old = cli->cl_max_rpcs_in_flight;
2144         cli->cl_max_rpcs_in_flight = max;
2145         client_adjust_max_dirty(cli);
2146
2147         diff = max - old;
2148
2149         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2150         for (i = 0; i < diff; i++) {
2151                 if (list_empty(&cli->cl_flight_waiters))
2152                         break;
2153
2154                 orsw = list_entry(cli->cl_flight_waiters.next,
2155                                   struct obd_request_slot_waiter, orsw_entry);
2156                 list_del_init(&orsw->orsw_entry);
2157                 cli->cl_rpcs_in_flight++;
2158                 wake_up(&orsw->orsw_waitq);
2159         }
2160         spin_unlock(&cli->cl_loi_list_lock);
2161
2162         return 0;
2163 }
2164 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2165
2166 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2167 {
2168         return cli->cl_max_mod_rpcs_in_flight;
2169 }
2170 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2171
2172 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2173 {
2174         struct obd_connect_data *ocd;
2175         __u16 maxmodrpcs;
2176         __u16 prev;
2177
2178         if (max > OBD_MAX_RIF_MAX || max < 1)
2179                 return -ERANGE;
2180
2181         /* cannot exceed or equal max_rpcs_in_flight */
2182         if (max >= cli->cl_max_rpcs_in_flight) {
2183                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2184                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2185                        cli->cl_import->imp_obd->obd_name,
2186                        max, cli->cl_max_rpcs_in_flight);
2187                 return -ERANGE;
2188         }
2189
2190         /* cannot exceed max modify RPCs in flight supported by the server */
2191         ocd = &cli->cl_import->imp_connect_data;
2192         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2193                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2194         else
2195                 maxmodrpcs = 1;
2196         if (max > maxmodrpcs) {
2197                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2198                        "higher than max_mod_rpcs_per_client value (%hu) "
2199                        "returned by the server at connection\n",
2200                        cli->cl_import->imp_obd->obd_name,
2201                        max, maxmodrpcs);
2202                 return -ERANGE;
2203         }
2204
2205         spin_lock(&cli->cl_mod_rpcs_lock);
2206
2207         prev = cli->cl_max_mod_rpcs_in_flight;
2208         cli->cl_max_mod_rpcs_in_flight = max;
2209
2210         /* wakeup waiters if limit has been increased */
2211         if (cli->cl_max_mod_rpcs_in_flight > prev)
2212                 wake_up(&cli->cl_mod_rpcs_waitq);
2213
2214         spin_unlock(&cli->cl_mod_rpcs_lock);
2215
2216         return 0;
2217 }
2218 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2219
2220 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2221                                struct seq_file *seq)
2222 {
2223         unsigned long mod_tot = 0, mod_cum;
2224         struct timespec64 now;
2225         int i;
2226
2227         ktime_get_real_ts64(&now);
2228
2229         spin_lock(&cli->cl_mod_rpcs_lock);
2230
2231         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2232                    (s64)now.tv_sec, now.tv_nsec);
2233         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2234                    cli->cl_mod_rpcs_in_flight);
2235
2236         seq_printf(seq, "\n\t\t\tmodify\n");
2237         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2238
2239         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2240
2241         mod_cum = 0;
2242         for (i = 0; i < OBD_HIST_MAX; i++) {
2243                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2244                 mod_cum += mod;
2245                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2246                            i, mod, pct(mod, mod_tot),
2247                            pct(mod_cum, mod_tot));
2248                 if (mod_cum == mod_tot)
2249                         break;
2250         }
2251
2252         spin_unlock(&cli->cl_mod_rpcs_lock);
2253
2254         return 0;
2255 }
2256 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2257
2258 /* The number of modify RPCs sent in parallel is limited
2259  * because the server has a finite number of slots per client to
2260  * store request result and ensure reply reconstruction when needed.
2261  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2262  * that takes into account server limit and cl_max_rpcs_in_flight
2263  * value.
2264  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2265  * one close request is allowed above the maximum.
2266  */
2267 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2268                                                  bool close_req)
2269 {
2270         bool avail;
2271
2272         /* A slot is available if
2273          * - number of modify RPCs in flight is less than the max
2274          * - it's a close RPC and no other close request is in flight
2275          */
2276         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2277                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2278
2279         return avail;
2280 }
2281
2282 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2283                                          bool close_req)
2284 {
2285         bool avail;
2286
2287         spin_lock(&cli->cl_mod_rpcs_lock);
2288         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2289         spin_unlock(&cli->cl_mod_rpcs_lock);
2290         return avail;
2291 }
2292
2293 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2294 {
2295         if (it != NULL &&
2296             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2297              it->it_op == IT_READDIR ||
2298              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2299                         return true;
2300         return false;
2301 }
2302
2303 /* Get a modify RPC slot from the obd client @cli according
2304  * to the kind of operation @opc that is going to be sent
2305  * and the intent @it of the operation if it applies.
2306  * If the maximum number of modify RPCs in flight is reached
2307  * the thread is put to sleep.
2308  * Returns the tag to be set in the request message. Tag 0
2309  * is reserved for non-modifying requests.
2310  */
2311 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2312                            struct lookup_intent *it)
2313 {
2314         bool                    close_req = false;
2315         __u16                   i, max;
2316
2317         /* read-only metadata RPCs don't consume a slot on MDT
2318          * for reply reconstruction
2319          */
2320         if (obd_skip_mod_rpc_slot(it))
2321                 return 0;
2322
2323         if (opc == MDS_CLOSE)
2324                 close_req = true;
2325
2326         do {
2327                 spin_lock(&cli->cl_mod_rpcs_lock);
2328                 max = cli->cl_max_mod_rpcs_in_flight;
2329                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2330                         /* there is a slot available */
2331                         cli->cl_mod_rpcs_in_flight++;
2332                         if (close_req)
2333                                 cli->cl_close_rpcs_in_flight++;
2334                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2335                                          cli->cl_mod_rpcs_in_flight);
2336                         /* find a free tag */
2337                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2338                                                 max + 1);
2339                         LASSERT(i < OBD_MAX_RIF_MAX);
2340                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2341                         spin_unlock(&cli->cl_mod_rpcs_lock);
2342                         /* tag 0 is reserved for non-modify RPCs */
2343
2344                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2345                                "opc %u, max %hu\n",
2346                                cli->cl_import->imp_obd->obd_name,
2347                                i + 1, opc, max);
2348
2349                         return i + 1;
2350                 }
2351                 spin_unlock(&cli->cl_mod_rpcs_lock);
2352
2353                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2354                        "opc %u, max %hu\n",
2355                        cli->cl_import->imp_obd->obd_name, opc, max);
2356
2357                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2358                                           obd_mod_rpc_slot_avail(cli,
2359                                                                  close_req));
2360         } while (true);
2361 }
2362 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2363
2364 /* Put a modify RPC slot from the obd client @cli according
2365  * to the kind of operation @opc that has been sent and the
2366  * intent @it of the operation if it applies.
2367  */
2368 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2369                           struct lookup_intent *it, __u16 tag)
2370 {
2371         bool                    close_req = false;
2372
2373         if (obd_skip_mod_rpc_slot(it))
2374                 return;
2375
2376         if (opc == MDS_CLOSE)
2377                 close_req = true;
2378
2379         spin_lock(&cli->cl_mod_rpcs_lock);
2380         cli->cl_mod_rpcs_in_flight--;
2381         if (close_req)
2382                 cli->cl_close_rpcs_in_flight--;
2383         /* release the tag in the bitmap */
2384         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2385         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2386         spin_unlock(&cli->cl_mod_rpcs_lock);
2387         wake_up(&cli->cl_mod_rpcs_waitq);
2388 }
2389 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2390