Whamcloud - gitweb
0d670daea52c1ee33889d82063cf6ad885937f7b
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
288                                                     vars, type);
289         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
291                                              : -ENOMEM;
292                 type->typ_debugfs_entry = NULL;
293                 GOTO(failed, rc);
294         }
295
296         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
297         if (rc)
298                 GOTO(failed, rc);
299 #ifdef HAVE_SERVER_SUPPORT
300 setup_ldt:
301 #endif
302         if (ldt) {
303                 type->typ_lu = ldt;
304                 rc = lu_device_type_init(ldt);
305                 if (rc)
306                         GOTO(failed, rc);
307         }
308
309         RETURN(0);
310
311 failed:
312         kobject_put(&type->typ_kobj);
313
314         RETURN(rc);
315 }
316 EXPORT_SYMBOL(class_register_type);
317
318 int class_unregister_type(const char *name)
319 {
320         struct obd_type *type = class_search_type(name);
321         int rc = 0;
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (atomic_read(&type->typ_refcnt)) {
330                 CERROR("type %s has refcount (%d)\n", name,
331                        atomic_read(&type->typ_refcnt));
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 type->typ_dt_ops = NULL;
335                 type->typ_md_ops = NULL;
336                 GOTO(out_put, rc = -EBUSY);
337         }
338
339         /* Put the final ref */
340         kobject_put(&type->typ_kobj);
341 out_put:
342         /* Put the ref returned by class_search_type() */
343         kobject_put(&type->typ_kobj);
344
345         RETURN(rc);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
348
349 /**
350  * Create a new obd device.
351  *
352  * Allocate the new obd_device and initialize it.
353  *
354  * \param[in] type_name obd device type string.
355  * \param[in] name      obd device name.
356  * \param[in] uuid      obd device UUID
357  *
358  * \retval newdev         pointer to created obd_device
359  * \retval ERR_PTR(errno) on error
360  */
361 struct obd_device *class_newdev(const char *type_name, const char *name,
362                                 const char *uuid)
363 {
364         struct obd_device *newdev;
365         struct obd_type *type = NULL;
366         ENTRY;
367
368         if (strlen(name) >= MAX_OBD_NAME) {
369                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370                 RETURN(ERR_PTR(-EINVAL));
371         }
372
373         type = class_get_type(type_name);
374         if (type == NULL){
375                 CERROR("OBD: unknown type: %s\n", type_name);
376                 RETURN(ERR_PTR(-ENODEV));
377         }
378
379         newdev = obd_device_alloc();
380         if (newdev == NULL) {
381                 class_put_type(type);
382                 RETURN(ERR_PTR(-ENOMEM));
383         }
384         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386         newdev->obd_type = type;
387         newdev->obd_minor = -1;
388
389         rwlock_init(&newdev->obd_pool_lock);
390         newdev->obd_pool_limit = 0;
391         newdev->obd_pool_slv = 0;
392
393         INIT_LIST_HEAD(&newdev->obd_exports);
394         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396         INIT_LIST_HEAD(&newdev->obd_exports_timed);
397         INIT_LIST_HEAD(&newdev->obd_nid_stats);
398         spin_lock_init(&newdev->obd_nid_lock);
399         spin_lock_init(&newdev->obd_dev_lock);
400         mutex_init(&newdev->obd_dev_mutex);
401         spin_lock_init(&newdev->obd_osfs_lock);
402         /* newdev->obd_osfs_age must be set to a value in the distant
403          * past to guarantee a fresh statfs is fetched on mount. */
404         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
405
406         /* XXX belongs in setup not attach  */
407         init_rwsem(&newdev->obd_observer_link_sem);
408         /* recovery data */
409         spin_lock_init(&newdev->obd_recovery_task_lock);
410         init_waitqueue_head(&newdev->obd_next_transno_waitq);
411         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415         INIT_LIST_HEAD(&newdev->obd_evict_list);
416         INIT_LIST_HEAD(&newdev->obd_lwp_list);
417
418         llog_group_init(&newdev->obd_olg);
419         /* Detach drops this */
420         atomic_set(&newdev->obd_refcount, 1);
421         lu_ref_init(&newdev->obd_reference);
422         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
423
424         newdev->obd_conn_inprogress = 0;
425
426         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
427
428         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429                newdev->obd_name, newdev);
430
431         return newdev;
432 }
433
434 /**
435  * Free obd device.
436  *
437  * \param[in] obd obd_device to be freed
438  *
439  * \retval none
440  */
441 void class_free_dev(struct obd_device *obd)
442 {
443         struct obd_type *obd_type = obd->obd_type;
444
445         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448                  "obd %p != obd_devs[%d] %p\n",
449                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451                  "obd_refcount should be 0, not %d\n",
452                  atomic_read(&obd->obd_refcount));
453         LASSERT(obd_type != NULL);
454
455         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456                obd->obd_name, obd->obd_type->typ_name);
457
458         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459                          obd->obd_name, obd->obd_uuid.uuid);
460         if (obd->obd_stopping) {
461                 int err;
462
463                 /* If we're not stopping, we were never set up */
464                 err = obd_cleanup(obd);
465                 if (err)
466                         CERROR("Cleanup %s returned %d\n",
467                                 obd->obd_name, err);
468         }
469
470         obd_device_free(obd);
471
472         class_put_type(obd_type);
473 }
474
475 /**
476  * Unregister obd device.
477  *
478  * Free slot in obd_dev[] used by \a obd.
479  *
480  * \param[in] new_obd obd_device to be unregistered
481  *
482  * \retval none
483  */
484 void class_unregister_device(struct obd_device *obd)
485 {
486         write_lock(&obd_dev_lock);
487         if (obd->obd_minor >= 0) {
488                 LASSERT(obd_devs[obd->obd_minor] == obd);
489                 obd_devs[obd->obd_minor] = NULL;
490                 obd->obd_minor = -1;
491         }
492         write_unlock(&obd_dev_lock);
493 }
494
495 /**
496  * Register obd device.
497  *
498  * Find free slot in obd_devs[], fills it with \a new_obd.
499  *
500  * \param[in] new_obd obd_device to be registered
501  *
502  * \retval 0          success
503  * \retval -EEXIST    device with this name is registered
504  * \retval -EOVERFLOW obd_devs[] is full
505  */
506 int class_register_device(struct obd_device *new_obd)
507 {
508         int ret = 0;
509         int i;
510         int new_obd_minor = 0;
511         bool minor_assign = false;
512         bool retried = false;
513
514 again:
515         write_lock(&obd_dev_lock);
516         for (i = 0; i < class_devno_max(); i++) {
517                 struct obd_device *obd = class_num2obd(i);
518
519                 if (obd != NULL &&
520                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
521
522                         if (!retried) {
523                                 write_unlock(&obd_dev_lock);
524
525                                 /* the obd_device could be waited to be
526                                  * destroyed by the "obd_zombie_impexp_thread".
527                                  */
528                                 obd_zombie_barrier();
529                                 retried = true;
530                                 goto again;
531                         }
532
533                         CERROR("%s: already exists, won't add\n",
534                                obd->obd_name);
535                         /* in case we found a free slot before duplicate */
536                         minor_assign = false;
537                         ret = -EEXIST;
538                         break;
539                 }
540                 if (!minor_assign && obd == NULL) {
541                         new_obd_minor = i;
542                         minor_assign = true;
543                 }
544         }
545
546         if (minor_assign) {
547                 new_obd->obd_minor = new_obd_minor;
548                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550                 obd_devs[new_obd_minor] = new_obd;
551         } else {
552                 if (ret == 0) {
553                         ret = -EOVERFLOW;
554                         CERROR("%s: all %u/%u devices used, increase "
555                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556                                i, class_devno_max(), ret);
557                 }
558         }
559         write_unlock(&obd_dev_lock);
560
561         RETURN(ret);
562 }
563
564 static int class_name2dev_nolock(const char *name)
565 {
566         int i;
567
568         if (!name)
569                 return -1;
570
571         for (i = 0; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd && strcmp(name, obd->obd_name) == 0) {
575                         /* Make sure we finished attaching before we give
576                            out any references */
577                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578                         if (obd->obd_attached) {
579                                 return i;
580                         }
581                         break;
582                 }
583         }
584
585         return -1;
586 }
587
588 int class_name2dev(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         read_lock(&obd_dev_lock);
596         i = class_name2dev_nolock(name);
597         read_unlock(&obd_dev_lock);
598
599         return i;
600 }
601 EXPORT_SYMBOL(class_name2dev);
602
603 struct obd_device *class_name2obd(const char *name)
604 {
605         int dev = class_name2dev(name);
606
607         if (dev < 0 || dev > class_devno_max())
608                 return NULL;
609         return class_num2obd(dev);
610 }
611 EXPORT_SYMBOL(class_name2obd);
612
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
614 {
615         int i;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
622                         return i;
623                 }
624         }
625
626         return -1;
627 }
628
629 int class_uuid2dev(struct obd_uuid *uuid)
630 {
631         int i;
632
633         read_lock(&obd_dev_lock);
634         i = class_uuid2dev_nolock(uuid);
635         read_unlock(&obd_dev_lock);
636
637         return i;
638 }
639 EXPORT_SYMBOL(class_uuid2dev);
640
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
642 {
643         int dev = class_uuid2dev(uuid);
644         if (dev < 0)
645                 return NULL;
646         return class_num2obd(dev);
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 /**
651  * Get obd device from ::obd_devs[]
652  *
653  * \param num [in] array index
654  *
655  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656  *         otherwise return the obd device there.
657  */
658 struct obd_device *class_num2obd(int num)
659 {
660         struct obd_device *obd = NULL;
661
662         if (num < class_devno_max()) {
663                 obd = obd_devs[num];
664                 if (obd == NULL)
665                         return NULL;
666
667                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668                          "%p obd_magic %08x != %08x\n",
669                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670                 LASSERTF(obd->obd_minor == num,
671                          "%p obd_minor %0d != %0d\n",
672                          obd, obd->obd_minor, num);
673         }
674
675         return obd;
676 }
677
678 /**
679  * Find obd in obd_dev[] by name or uuid.
680  *
681  * Increment obd's refcount if found.
682  *
683  * \param[in] str obd name or uuid
684  *
685  * \retval NULL    if not found
686  * \retval target  pointer to found obd_device
687  */
688 struct obd_device *class_dev_by_str(const char *str)
689 {
690         struct obd_device *target = NULL;
691         struct obd_uuid tgtuuid;
692         int rc;
693
694         obd_str2uuid(&tgtuuid, str);
695
696         read_lock(&obd_dev_lock);
697         rc = class_uuid2dev_nolock(&tgtuuid);
698         if (rc < 0)
699                 rc = class_name2dev_nolock(str);
700
701         if (rc >= 0)
702                 target = class_num2obd(rc);
703
704         if (target != NULL)
705                 class_incref(target, "find", current);
706         read_unlock(&obd_dev_lock);
707
708         RETURN(target);
709 }
710 EXPORT_SYMBOL(class_dev_by_str);
711
712 /**
713  * Get obd devices count. Device in any
714  *    state are counted
715  * \retval obd device count
716  */
717 int get_devices_count(void)
718 {
719         int index, max_index = class_devno_max(), dev_count = 0;
720
721         read_lock(&obd_dev_lock);
722         for (index = 0; index <= max_index; index++) {
723                 struct obd_device *obd = class_num2obd(index);
724                 if (obd != NULL)
725                         dev_count++;
726         }
727         read_unlock(&obd_dev_lock);
728
729         return dev_count;
730 }
731 EXPORT_SYMBOL(get_devices_count);
732
733 void class_obd_list(void)
734 {
735         char *status;
736         int i;
737
738         read_lock(&obd_dev_lock);
739         for (i = 0; i < class_devno_max(); i++) {
740                 struct obd_device *obd = class_num2obd(i);
741
742                 if (obd == NULL)
743                         continue;
744                 if (obd->obd_stopping)
745                         status = "ST";
746                 else if (obd->obd_set_up)
747                         status = "UP";
748                 else if (obd->obd_attached)
749                         status = "AT";
750                 else
751                         status = "--";
752                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753                          i, status, obd->obd_type->typ_name,
754                          obd->obd_name, obd->obd_uuid.uuid,
755                          atomic_read(&obd->obd_refcount));
756         }
757         read_unlock(&obd_dev_lock);
758 }
759
760 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
761    specified, then only the client with that uuid is returned,
762    otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764                                           const char *type_name,
765                                           struct obd_uuid *grp_uuid)
766 {
767         int i;
768
769         read_lock(&obd_dev_lock);
770         for (i = 0; i < class_devno_max(); i++) {
771                 struct obd_device *obd = class_num2obd(i);
772
773                 if (obd == NULL)
774                         continue;
775                 if ((strncmp(obd->obd_type->typ_name, type_name,
776                              strlen(type_name)) == 0)) {
777                         if (obd_uuid_equals(tgt_uuid,
778                                             &obd->u.cli.cl_target_uuid) &&
779                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
780                                                          &obd->obd_uuid) : 1)) {
781                                 read_unlock(&obd_dev_lock);
782                                 return obd;
783                         }
784                 }
785         }
786         read_unlock(&obd_dev_lock);
787
788         return NULL;
789 }
790 EXPORT_SYMBOL(class_find_client_obd);
791
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793    searching at *next, and if a device is found, the next index to look
794    at is saved in *next. If next is NULL, then the first matching device
795    will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
797 {
798         int i;
799
800         if (next == NULL)
801                 i = 0;
802         else if (*next >= 0 && *next < class_devno_max())
803                 i = *next;
804         else
805                 return NULL;
806
807         read_lock(&obd_dev_lock);
808         for (; i < class_devno_max(); i++) {
809                 struct obd_device *obd = class_num2obd(i);
810
811                 if (obd == NULL)
812                         continue;
813                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
814                         if (next != NULL)
815                                 *next = i+1;
816                         read_unlock(&obd_dev_lock);
817                         return obd;
818                 }
819         }
820         read_unlock(&obd_dev_lock);
821
822         return NULL;
823 }
824 EXPORT_SYMBOL(class_devices_in_group);
825
826 /**
827  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828  * adjust sptlrpc settings accordingly.
829  */
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
831 {
832         struct obd_device  *obd;
833         const char         *type;
834         int                 i, rc = 0, rc2;
835
836         LASSERT(namelen > 0);
837
838         read_lock(&obd_dev_lock);
839         for (i = 0; i < class_devno_max(); i++) {
840                 obd = class_num2obd(i);
841
842                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
843                         continue;
844
845                 /* only notify mdc, osc, osp, lwp, mdt, ost
846                  * because only these have a -sptlrpc llog */
847                 type = obd->obd_type->typ_name;
848                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853                     strcmp(type, LUSTRE_OST_NAME) != 0)
854                         continue;
855
856                 if (strncmp(obd->obd_name, fsname, namelen))
857                         continue;
858
859                 class_incref(obd, __FUNCTION__, obd);
860                 read_unlock(&obd_dev_lock);
861                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862                                          sizeof(KEY_SPTLRPC_CONF),
863                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
864                 rc = rc ? rc : rc2;
865                 class_decref(obd, __FUNCTION__, obd);
866                 read_lock(&obd_dev_lock);
867         }
868         read_unlock(&obd_dev_lock);
869         return rc;
870 }
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
872
873 void obd_cleanup_caches(void)
874 {
875         ENTRY;
876         if (obd_device_cachep) {
877                 kmem_cache_destroy(obd_device_cachep);
878                 obd_device_cachep = NULL;
879         }
880
881         EXIT;
882 }
883
884 int obd_init_caches(void)
885 {
886         int rc;
887         ENTRY;
888
889         LASSERT(obd_device_cachep == NULL);
890         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891                                 sizeof(struct obd_device),
892                                 0, 0, 0, sizeof(struct obd_device), NULL);
893         if (!obd_device_cachep)
894                 GOTO(out, rc = -ENOMEM);
895
896         RETURN(0);
897 out:
898         obd_cleanup_caches();
899         RETURN(rc);
900 }
901
902 static struct portals_handle_ops export_handle_ops;
903
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
906 {
907         struct obd_export *export;
908         ENTRY;
909
910         if (!conn) {
911                 CDEBUG(D_CACHE, "looking for null handle\n");
912                 RETURN(NULL);
913         }
914
915         if (conn->cookie == -1) {  /* this means assign a new connection */
916                 CDEBUG(D_CACHE, "want a new connection\n");
917                 RETURN(NULL);
918         }
919
920         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921         export = class_handle2object(conn->cookie, &export_handle_ops);
922         RETURN(export);
923 }
924 EXPORT_SYMBOL(class_conn2export);
925
926 struct obd_device *class_exp2obd(struct obd_export *exp)
927 {
928         if (exp)
929                 return exp->exp_obd;
930         return NULL;
931 }
932 EXPORT_SYMBOL(class_exp2obd);
933
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
935 {
936         struct obd_device *obd = exp->exp_obd;
937         if (obd == NULL)
938                 return NULL;
939         return obd->u.cli.cl_import;
940 }
941 EXPORT_SYMBOL(class_exp2cliimp);
942
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
945 {
946         struct obd_device *obd = exp->exp_obd;
947         ENTRY;
948
949         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950         LASSERT(obd != NULL);
951
952         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953                exp->exp_client_uuid.uuid, obd->obd_name);
954
955         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956         if (exp->exp_connection)
957                 ptlrpc_put_connection_superhack(exp->exp_connection);
958
959         LASSERT(list_empty(&exp->exp_outstanding_replies));
960         LASSERT(list_empty(&exp->exp_uncommitted_replies));
961         LASSERT(list_empty(&exp->exp_req_replay_queue));
962         LASSERT(list_empty(&exp->exp_hp_rpcs));
963         obd_destroy_export(exp);
964         /* self export doesn't hold a reference to an obd, although it
965          * exists until freeing of the obd */
966         if (exp != obd->obd_self_export)
967                 class_decref(obd, "export", exp);
968
969         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970         kfree_rcu(exp, exp_handle.h_rcu);
971         EXIT;
972 }
973
974 static struct portals_handle_ops export_handle_ops = {
975         .hop_type       = "export",
976 };
977
978 struct obd_export *class_export_get(struct obd_export *exp)
979 {
980         refcount_inc(&exp->exp_handle.h_ref);
981         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
982                refcount_read(&exp->exp_handle.h_ref));
983         return exp;
984 }
985 EXPORT_SYMBOL(class_export_get);
986
987 void class_export_put(struct obd_export *exp)
988 {
989         LASSERT(exp != NULL);
990         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
991         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
992         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
993                refcount_read(&exp->exp_handle.h_ref) - 1);
994
995         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
996                 struct obd_device *obd = exp->exp_obd;
997
998                 CDEBUG(D_IOCTL, "final put %p/%s\n",
999                        exp, exp->exp_client_uuid.uuid);
1000
1001                 /* release nid stat refererence */
1002                 lprocfs_exp_cleanup(exp);
1003
1004                 if (exp == obd->obd_self_export) {
1005                         /* self export should be destroyed without
1006                          * zombie thread as it doesn't hold a
1007                          * reference to obd and doesn't hold any
1008                          * resources */
1009                         class_export_destroy(exp);
1010                         /* self export is destroyed, no class
1011                          * references exist and it is safe to free
1012                          * obd */
1013                         class_free_dev(obd);
1014                 } else {
1015                         LASSERT(!list_empty(&exp->exp_obd_chain));
1016                         obd_zombie_export_add(exp);
1017                 }
1018
1019         }
1020 }
1021 EXPORT_SYMBOL(class_export_put);
1022
1023 static void obd_zombie_exp_cull(struct work_struct *ws)
1024 {
1025         struct obd_export *export;
1026
1027         export = container_of(ws, struct obd_export, exp_zombie_work);
1028         class_export_destroy(export);
1029 }
1030
1031 /* Creates a new export, adds it to the hash table, and returns a
1032  * pointer to it. The refcount is 2: one for the hash reference, and
1033  * one for the pointer returned by this function. */
1034 struct obd_export *__class_new_export(struct obd_device *obd,
1035                                       struct obd_uuid *cluuid, bool is_self)
1036 {
1037         struct obd_export *export;
1038         int rc = 0;
1039         ENTRY;
1040
1041         OBD_ALLOC_PTR(export);
1042         if (!export)
1043                 return ERR_PTR(-ENOMEM);
1044
1045         export->exp_conn_cnt = 0;
1046         export->exp_lock_hash = NULL;
1047         export->exp_flock_hash = NULL;
1048         /* 2 = class_handle_hash + last */
1049         refcount_set(&export->exp_handle.h_ref, 2);
1050         atomic_set(&export->exp_rpc_count, 0);
1051         atomic_set(&export->exp_cb_count, 0);
1052         atomic_set(&export->exp_locks_count, 0);
1053 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1054         INIT_LIST_HEAD(&export->exp_locks_list);
1055         spin_lock_init(&export->exp_locks_list_guard);
1056 #endif
1057         atomic_set(&export->exp_replay_count, 0);
1058         export->exp_obd = obd;
1059         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1060         spin_lock_init(&export->exp_uncommitted_replies_lock);
1061         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1062         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1063         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1064         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1065         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1066         class_handle_hash(&export->exp_handle, &export_handle_ops);
1067         export->exp_last_request_time = ktime_get_real_seconds();
1068         spin_lock_init(&export->exp_lock);
1069         spin_lock_init(&export->exp_rpc_lock);
1070         INIT_HLIST_NODE(&export->exp_nid_hash);
1071         INIT_HLIST_NODE(&export->exp_gen_hash);
1072         spin_lock_init(&export->exp_bl_list_lock);
1073         INIT_LIST_HEAD(&export->exp_bl_list);
1074         INIT_LIST_HEAD(&export->exp_stale_list);
1075         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1076
1077         export->exp_sp_peer = LUSTRE_SP_ANY;
1078         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1079         export->exp_client_uuid = *cluuid;
1080         obd_init_export(export);
1081
1082         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1083
1084         spin_lock(&obd->obd_dev_lock);
1085         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1086                 /* shouldn't happen, but might race */
1087                 if (obd->obd_stopping)
1088                         GOTO(exit_unlock, rc = -ENODEV);
1089
1090                 rc = obd_uuid_add(obd, export);
1091                 if (rc != 0) {
1092                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1093                                       obd->obd_name, cluuid->uuid, rc);
1094                         GOTO(exit_unlock, rc = -EALREADY);
1095                 }
1096         }
1097
1098         if (!is_self) {
1099                 class_incref(obd, "export", export);
1100                 list_add_tail(&export->exp_obd_chain_timed,
1101                               &obd->obd_exports_timed);
1102                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1103                 obd->obd_num_exports++;
1104         } else {
1105                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1106                 INIT_LIST_HEAD(&export->exp_obd_chain);
1107         }
1108         spin_unlock(&obd->obd_dev_lock);
1109         RETURN(export);
1110
1111 exit_unlock:
1112         spin_unlock(&obd->obd_dev_lock);
1113         class_handle_unhash(&export->exp_handle);
1114         obd_destroy_export(export);
1115         OBD_FREE_PTR(export);
1116         return ERR_PTR(rc);
1117 }
1118
1119 struct obd_export *class_new_export(struct obd_device *obd,
1120                                     struct obd_uuid *uuid)
1121 {
1122         return __class_new_export(obd, uuid, false);
1123 }
1124 EXPORT_SYMBOL(class_new_export);
1125
1126 struct obd_export *class_new_export_self(struct obd_device *obd,
1127                                          struct obd_uuid *uuid)
1128 {
1129         return __class_new_export(obd, uuid, true);
1130 }
1131
1132 void class_unlink_export(struct obd_export *exp)
1133 {
1134         class_handle_unhash(&exp->exp_handle);
1135
1136         if (exp->exp_obd->obd_self_export == exp) {
1137                 class_export_put(exp);
1138                 return;
1139         }
1140
1141         spin_lock(&exp->exp_obd->obd_dev_lock);
1142         /* delete an uuid-export hashitem from hashtables */
1143         if (exp != exp->exp_obd->obd_self_export)
1144                 obd_uuid_del(exp->exp_obd, exp);
1145
1146 #ifdef HAVE_SERVER_SUPPORT
1147         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1148                 struct tg_export_data   *ted = &exp->exp_target_data;
1149                 struct cfs_hash         *hash;
1150
1151                 /* Because obd_gen_hash will not be released until
1152                  * class_cleanup(), so hash should never be NULL here */
1153                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1154                 LASSERT(hash != NULL);
1155                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1156                              &exp->exp_gen_hash);
1157                 cfs_hash_putref(hash);
1158         }
1159 #endif /* HAVE_SERVER_SUPPORT */
1160
1161         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1162         list_del_init(&exp->exp_obd_chain_timed);
1163         exp->exp_obd->obd_num_exports--;
1164         spin_unlock(&exp->exp_obd->obd_dev_lock);
1165         atomic_inc(&obd_stale_export_num);
1166
1167         /* A reference is kept by obd_stale_exports list */
1168         obd_stale_export_put(exp);
1169 }
1170 EXPORT_SYMBOL(class_unlink_export);
1171
1172 /* Import management functions */
1173 static void obd_zombie_import_free(struct obd_import *imp)
1174 {
1175         ENTRY;
1176
1177         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1178                 imp->imp_obd->obd_name);
1179
1180         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1181
1182         ptlrpc_put_connection_superhack(imp->imp_connection);
1183
1184         while (!list_empty(&imp->imp_conn_list)) {
1185                 struct obd_import_conn *imp_conn;
1186
1187                 imp_conn = list_entry(imp->imp_conn_list.next,
1188                                       struct obd_import_conn, oic_item);
1189                 list_del_init(&imp_conn->oic_item);
1190                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1191                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1192         }
1193
1194         LASSERT(imp->imp_sec == NULL);
1195         class_decref(imp->imp_obd, "import", imp);
1196         OBD_FREE_PTR(imp);
1197         EXIT;
1198 }
1199
1200 struct obd_import *class_import_get(struct obd_import *import)
1201 {
1202         atomic_inc(&import->imp_refcount);
1203         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1204                atomic_read(&import->imp_refcount),
1205                import->imp_obd->obd_name);
1206         return import;
1207 }
1208 EXPORT_SYMBOL(class_import_get);
1209
1210 void class_import_put(struct obd_import *imp)
1211 {
1212         ENTRY;
1213
1214         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1215
1216         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1217                atomic_read(&imp->imp_refcount) - 1,
1218                imp->imp_obd->obd_name);
1219
1220         if (atomic_dec_and_test(&imp->imp_refcount)) {
1221                 CDEBUG(D_INFO, "final put import %p\n", imp);
1222                 obd_zombie_import_add(imp);
1223         }
1224
1225         EXIT;
1226 }
1227 EXPORT_SYMBOL(class_import_put);
1228
1229 static void init_imp_at(struct imp_at *at) {
1230         int i;
1231         at_init(&at->iat_net_latency, 0, 0);
1232         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1233                 /* max service estimates are tracked on the server side, so
1234                    don't use the AT history here, just use the last reported
1235                    val. (But keep hist for proc histogram, worst_ever) */
1236                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1237                         AT_FLG_NOHIST);
1238         }
1239 }
1240
1241 static void obd_zombie_imp_cull(struct work_struct *ws)
1242 {
1243         struct obd_import *import;
1244
1245         import = container_of(ws, struct obd_import, imp_zombie_work);
1246         obd_zombie_import_free(import);
1247 }
1248
1249 struct obd_import *class_new_import(struct obd_device *obd)
1250 {
1251         struct obd_import *imp;
1252         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1253
1254         OBD_ALLOC(imp, sizeof(*imp));
1255         if (imp == NULL)
1256                 return NULL;
1257
1258         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1259         INIT_LIST_HEAD(&imp->imp_replay_list);
1260         INIT_LIST_HEAD(&imp->imp_sending_list);
1261         INIT_LIST_HEAD(&imp->imp_delayed_list);
1262         INIT_LIST_HEAD(&imp->imp_committed_list);
1263         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1264         imp->imp_known_replied_xid = 0;
1265         imp->imp_replay_cursor = &imp->imp_committed_list;
1266         spin_lock_init(&imp->imp_lock);
1267         imp->imp_last_success_conn = 0;
1268         imp->imp_state = LUSTRE_IMP_NEW;
1269         imp->imp_obd = class_incref(obd, "import", imp);
1270         rwlock_init(&imp->imp_sec_lock);
1271         init_waitqueue_head(&imp->imp_recovery_waitq);
1272         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1273
1274         if (curr_pid_ns->child_reaper)
1275                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1276         else
1277                 imp->imp_sec_refpid = 1;
1278
1279         atomic_set(&imp->imp_refcount, 2);
1280         atomic_set(&imp->imp_unregistering, 0);
1281         atomic_set(&imp->imp_inflight, 0);
1282         atomic_set(&imp->imp_replay_inflight, 0);
1283         atomic_set(&imp->imp_inval_count, 0);
1284         INIT_LIST_HEAD(&imp->imp_conn_list);
1285         init_imp_at(&imp->imp_at);
1286
1287         /* the default magic is V2, will be used in connect RPC, and
1288          * then adjusted according to the flags in request/reply. */
1289         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1290
1291         return imp;
1292 }
1293 EXPORT_SYMBOL(class_new_import);
1294
1295 void class_destroy_import(struct obd_import *import)
1296 {
1297         LASSERT(import != NULL);
1298         LASSERT(import != LP_POISON);
1299
1300         spin_lock(&import->imp_lock);
1301         import->imp_generation++;
1302         spin_unlock(&import->imp_lock);
1303         class_import_put(import);
1304 }
1305 EXPORT_SYMBOL(class_destroy_import);
1306
1307 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1308
1309 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1310 {
1311         spin_lock(&exp->exp_locks_list_guard);
1312
1313         LASSERT(lock->l_exp_refs_nr >= 0);
1314
1315         if (lock->l_exp_refs_target != NULL &&
1316             lock->l_exp_refs_target != exp) {
1317                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1318                               exp, lock, lock->l_exp_refs_target);
1319         }
1320         if ((lock->l_exp_refs_nr ++) == 0) {
1321                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1322                 lock->l_exp_refs_target = exp;
1323         }
1324         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1325                lock, exp, lock->l_exp_refs_nr);
1326         spin_unlock(&exp->exp_locks_list_guard);
1327 }
1328 EXPORT_SYMBOL(__class_export_add_lock_ref);
1329
1330 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1331 {
1332         spin_lock(&exp->exp_locks_list_guard);
1333         LASSERT(lock->l_exp_refs_nr > 0);
1334         if (lock->l_exp_refs_target != exp) {
1335                 LCONSOLE_WARN("lock %p, "
1336                               "mismatching export pointers: %p, %p\n",
1337                               lock, lock->l_exp_refs_target, exp);
1338         }
1339         if (-- lock->l_exp_refs_nr == 0) {
1340                 list_del_init(&lock->l_exp_refs_link);
1341                 lock->l_exp_refs_target = NULL;
1342         }
1343         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1344                lock, exp, lock->l_exp_refs_nr);
1345         spin_unlock(&exp->exp_locks_list_guard);
1346 }
1347 EXPORT_SYMBOL(__class_export_del_lock_ref);
1348 #endif
1349
1350 /* A connection defines an export context in which preallocation can
1351    be managed. This releases the export pointer reference, and returns
1352    the export handle, so the export refcount is 1 when this function
1353    returns. */
1354 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1355                   struct obd_uuid *cluuid)
1356 {
1357         struct obd_export *export;
1358         LASSERT(conn != NULL);
1359         LASSERT(obd != NULL);
1360         LASSERT(cluuid != NULL);
1361         ENTRY;
1362
1363         export = class_new_export(obd, cluuid);
1364         if (IS_ERR(export))
1365                 RETURN(PTR_ERR(export));
1366
1367         conn->cookie = export->exp_handle.h_cookie;
1368         class_export_put(export);
1369
1370         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1371                cluuid->uuid, conn->cookie);
1372         RETURN(0);
1373 }
1374 EXPORT_SYMBOL(class_connect);
1375
1376 /* if export is involved in recovery then clean up related things */
1377 static void class_export_recovery_cleanup(struct obd_export *exp)
1378 {
1379         struct obd_device *obd = exp->exp_obd;
1380
1381         spin_lock(&obd->obd_recovery_task_lock);
1382         if (obd->obd_recovering) {
1383                 if (exp->exp_in_recovery) {
1384                         spin_lock(&exp->exp_lock);
1385                         exp->exp_in_recovery = 0;
1386                         spin_unlock(&exp->exp_lock);
1387                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1388                         atomic_dec(&obd->obd_connected_clients);
1389                 }
1390
1391                 /* if called during recovery then should update
1392                  * obd_stale_clients counter,
1393                  * lightweight exports are not counted */
1394                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1395                         exp->exp_obd->obd_stale_clients++;
1396         }
1397         spin_unlock(&obd->obd_recovery_task_lock);
1398
1399         spin_lock(&exp->exp_lock);
1400         /** Cleanup req replay fields */
1401         if (exp->exp_req_replay_needed) {
1402                 exp->exp_req_replay_needed = 0;
1403
1404                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1405                 atomic_dec(&obd->obd_req_replay_clients);
1406         }
1407
1408         /** Cleanup lock replay data */
1409         if (exp->exp_lock_replay_needed) {
1410                 exp->exp_lock_replay_needed = 0;
1411
1412                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1413                 atomic_dec(&obd->obd_lock_replay_clients);
1414         }
1415         spin_unlock(&exp->exp_lock);
1416 }
1417
1418 /* This function removes 1-3 references from the export:
1419  * 1 - for export pointer passed
1420  * and if disconnect really need
1421  * 2 - removing from hash
1422  * 3 - in client_unlink_export
1423  * The export pointer passed to this function can destroyed */
1424 int class_disconnect(struct obd_export *export)
1425 {
1426         int already_disconnected;
1427         ENTRY;
1428
1429         if (export == NULL) {
1430                 CWARN("attempting to free NULL export %p\n", export);
1431                 RETURN(-EINVAL);
1432         }
1433
1434         spin_lock(&export->exp_lock);
1435         already_disconnected = export->exp_disconnected;
1436         export->exp_disconnected = 1;
1437         /*  We hold references of export for uuid hash
1438          *  and nid_hash and export link at least. So
1439          *  it is safe to call cfs_hash_del in there.  */
1440         if (!hlist_unhashed(&export->exp_nid_hash))
1441                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1442                              &export->exp_connection->c_peer.nid,
1443                              &export->exp_nid_hash);
1444         spin_unlock(&export->exp_lock);
1445
1446         /* class_cleanup(), abort_recovery(), and class_fail_export()
1447          * all end up in here, and if any of them race we shouldn't
1448          * call extra class_export_puts(). */
1449         if (already_disconnected) {
1450                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1451                 GOTO(no_disconn, already_disconnected);
1452         }
1453
1454         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1455                export->exp_handle.h_cookie);
1456
1457         class_export_recovery_cleanup(export);
1458         class_unlink_export(export);
1459 no_disconn:
1460         class_export_put(export);
1461         RETURN(0);
1462 }
1463 EXPORT_SYMBOL(class_disconnect);
1464
1465 /* Return non-zero for a fully connected export */
1466 int class_connected_export(struct obd_export *exp)
1467 {
1468         int connected = 0;
1469
1470         if (exp) {
1471                 spin_lock(&exp->exp_lock);
1472                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1473                 spin_unlock(&exp->exp_lock);
1474         }
1475         return connected;
1476 }
1477 EXPORT_SYMBOL(class_connected_export);
1478
1479 static void class_disconnect_export_list(struct list_head *list,
1480                                          enum obd_option flags)
1481 {
1482         int rc;
1483         struct obd_export *exp;
1484         ENTRY;
1485
1486         /* It's possible that an export may disconnect itself, but
1487          * nothing else will be added to this list. */
1488         while (!list_empty(list)) {
1489                 exp = list_entry(list->next, struct obd_export,
1490                                  exp_obd_chain);
1491                 /* need for safe call CDEBUG after obd_disconnect */
1492                 class_export_get(exp);
1493
1494                 spin_lock(&exp->exp_lock);
1495                 exp->exp_flags = flags;
1496                 spin_unlock(&exp->exp_lock);
1497
1498                 if (obd_uuid_equals(&exp->exp_client_uuid,
1499                                     &exp->exp_obd->obd_uuid)) {
1500                         CDEBUG(D_HA,
1501                                "exp %p export uuid == obd uuid, don't discon\n",
1502                                exp);
1503                         /* Need to delete this now so we don't end up pointing
1504                          * to work_list later when this export is cleaned up. */
1505                         list_del_init(&exp->exp_obd_chain);
1506                         class_export_put(exp);
1507                         continue;
1508                 }
1509
1510                 class_export_get(exp);
1511                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1512                        "last request at %lld\n",
1513                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1514                        exp, exp->exp_last_request_time);
1515                 /* release one export reference anyway */
1516                 rc = obd_disconnect(exp);
1517
1518                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1519                        obd_export_nid2str(exp), exp, rc);
1520                 class_export_put(exp);
1521         }
1522         EXIT;
1523 }
1524
1525 void class_disconnect_exports(struct obd_device *obd)
1526 {
1527         struct list_head work_list;
1528         ENTRY;
1529
1530         /* Move all of the exports from obd_exports to a work list, en masse. */
1531         INIT_LIST_HEAD(&work_list);
1532         spin_lock(&obd->obd_dev_lock);
1533         list_splice_init(&obd->obd_exports, &work_list);
1534         list_splice_init(&obd->obd_delayed_exports, &work_list);
1535         spin_unlock(&obd->obd_dev_lock);
1536
1537         if (!list_empty(&work_list)) {
1538                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1539                        "disconnecting them\n", obd->obd_minor, obd);
1540                 class_disconnect_export_list(&work_list,
1541                                              exp_flags_from_obd(obd));
1542         } else
1543                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1544                        obd->obd_minor, obd);
1545         EXIT;
1546 }
1547 EXPORT_SYMBOL(class_disconnect_exports);
1548
1549 /* Remove exports that have not completed recovery.
1550  */
1551 void class_disconnect_stale_exports(struct obd_device *obd,
1552                                     int (*test_export)(struct obd_export *))
1553 {
1554         struct list_head work_list;
1555         struct obd_export *exp, *n;
1556         int evicted = 0;
1557         ENTRY;
1558
1559         INIT_LIST_HEAD(&work_list);
1560         spin_lock(&obd->obd_dev_lock);
1561         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1562                                  exp_obd_chain) {
1563                 /* don't count self-export as client */
1564                 if (obd_uuid_equals(&exp->exp_client_uuid,
1565                                     &exp->exp_obd->obd_uuid))
1566                         continue;
1567
1568                 /* don't evict clients which have no slot in last_rcvd
1569                  * (e.g. lightweight connection) */
1570                 if (exp->exp_target_data.ted_lr_idx == -1)
1571                         continue;
1572
1573                 spin_lock(&exp->exp_lock);
1574                 if (exp->exp_failed || test_export(exp)) {
1575                         spin_unlock(&exp->exp_lock);
1576                         continue;
1577                 }
1578                 exp->exp_failed = 1;
1579                 spin_unlock(&exp->exp_lock);
1580
1581                 list_move(&exp->exp_obd_chain, &work_list);
1582                 evicted++;
1583                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1584                        obd->obd_name, exp->exp_client_uuid.uuid,
1585                        obd_export_nid2str(exp));
1586                 print_export_data(exp, "EVICTING", 0, D_HA);
1587         }
1588         spin_unlock(&obd->obd_dev_lock);
1589
1590         if (evicted)
1591                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1592                               obd->obd_name, evicted);
1593
1594         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1595                                                  OBD_OPT_ABORT_RECOV);
1596         EXIT;
1597 }
1598 EXPORT_SYMBOL(class_disconnect_stale_exports);
1599
1600 void class_fail_export(struct obd_export *exp)
1601 {
1602         int rc, already_failed;
1603
1604         spin_lock(&exp->exp_lock);
1605         already_failed = exp->exp_failed;
1606         exp->exp_failed = 1;
1607         spin_unlock(&exp->exp_lock);
1608
1609         if (already_failed) {
1610                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1611                        exp, exp->exp_client_uuid.uuid);
1612                 return;
1613         }
1614
1615         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1616                exp, exp->exp_client_uuid.uuid);
1617
1618         if (obd_dump_on_timeout)
1619                 libcfs_debug_dumplog();
1620
1621         /* need for safe call CDEBUG after obd_disconnect */
1622         class_export_get(exp);
1623
1624         /* Most callers into obd_disconnect are removing their own reference
1625          * (request, for example) in addition to the one from the hash table.
1626          * We don't have such a reference here, so make one. */
1627         class_export_get(exp);
1628         rc = obd_disconnect(exp);
1629         if (rc)
1630                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1631         else
1632                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1633                        exp, exp->exp_client_uuid.uuid);
1634         class_export_put(exp);
1635 }
1636 EXPORT_SYMBOL(class_fail_export);
1637
1638 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1639 {
1640         struct cfs_hash *nid_hash;
1641         struct obd_export *doomed_exp = NULL;
1642         int exports_evicted = 0;
1643
1644         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1645
1646         spin_lock(&obd->obd_dev_lock);
1647         /* umount has run already, so evict thread should leave
1648          * its task to umount thread now */
1649         if (obd->obd_stopping) {
1650                 spin_unlock(&obd->obd_dev_lock);
1651                 return exports_evicted;
1652         }
1653         nid_hash = obd->obd_nid_hash;
1654         cfs_hash_getref(nid_hash);
1655         spin_unlock(&obd->obd_dev_lock);
1656
1657         do {
1658                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1659                 if (doomed_exp == NULL)
1660                         break;
1661
1662                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1663                          "nid %s found, wanted nid %s, requested nid %s\n",
1664                          obd_export_nid2str(doomed_exp),
1665                          libcfs_nid2str(nid_key), nid);
1666                 LASSERTF(doomed_exp != obd->obd_self_export,
1667                          "self-export is hashed by NID?\n");
1668                 exports_evicted++;
1669                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1670                               "request\n", obd->obd_name,
1671                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1672                               obd_export_nid2str(doomed_exp));
1673                 class_fail_export(doomed_exp);
1674                 class_export_put(doomed_exp);
1675         } while (1);
1676
1677         cfs_hash_putref(nid_hash);
1678
1679         if (!exports_evicted)
1680                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1681                        obd->obd_name, nid);
1682         return exports_evicted;
1683 }
1684 EXPORT_SYMBOL(obd_export_evict_by_nid);
1685
1686 #ifdef HAVE_SERVER_SUPPORT
1687 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1688 {
1689         struct obd_export *doomed_exp = NULL;
1690         struct obd_uuid doomed_uuid;
1691         int exports_evicted = 0;
1692
1693         spin_lock(&obd->obd_dev_lock);
1694         if (obd->obd_stopping) {
1695                 spin_unlock(&obd->obd_dev_lock);
1696                 return exports_evicted;
1697         }
1698         spin_unlock(&obd->obd_dev_lock);
1699
1700         obd_str2uuid(&doomed_uuid, uuid);
1701         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1702                 CERROR("%s: can't evict myself\n", obd->obd_name);
1703                 return exports_evicted;
1704         }
1705
1706         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1707         if (doomed_exp == NULL) {
1708                 CERROR("%s: can't disconnect %s: no exports found\n",
1709                        obd->obd_name, uuid);
1710         } else {
1711                 CWARN("%s: evicting %s at adminstrative request\n",
1712                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1713                 class_fail_export(doomed_exp);
1714                 class_export_put(doomed_exp);
1715                 obd_uuid_del(obd, doomed_exp);
1716                 exports_evicted++;
1717         }
1718
1719         return exports_evicted;
1720 }
1721 #endif /* HAVE_SERVER_SUPPORT */
1722
1723 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1724 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1725 EXPORT_SYMBOL(class_export_dump_hook);
1726 #endif
1727
1728 static void print_export_data(struct obd_export *exp, const char *status,
1729                               int locks, int debug_level)
1730 {
1731         struct ptlrpc_reply_state *rs;
1732         struct ptlrpc_reply_state *first_reply = NULL;
1733         int nreplies = 0;
1734
1735         spin_lock(&exp->exp_lock);
1736         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1737                             rs_exp_list) {
1738                 if (nreplies == 0)
1739                         first_reply = rs;
1740                 nreplies++;
1741         }
1742         spin_unlock(&exp->exp_lock);
1743
1744         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1745                "%p %s %llu stale:%d\n",
1746                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1747                obd_export_nid2str(exp),
1748                refcount_read(&exp->exp_handle.h_ref),
1749                atomic_read(&exp->exp_rpc_count),
1750                atomic_read(&exp->exp_cb_count),
1751                atomic_read(&exp->exp_locks_count),
1752                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1753                nreplies, first_reply, nreplies > 3 ? "..." : "",
1754                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1755 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1756         if (locks && class_export_dump_hook != NULL)
1757                 class_export_dump_hook(exp);
1758 #endif
1759 }
1760
1761 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1762 {
1763         struct obd_export *exp;
1764
1765         spin_lock(&obd->obd_dev_lock);
1766         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1767                 print_export_data(exp, "ACTIVE", locks, debug_level);
1768         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1769                 print_export_data(exp, "UNLINKED", locks, debug_level);
1770         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1771                 print_export_data(exp, "DELAYED", locks, debug_level);
1772         spin_unlock(&obd->obd_dev_lock);
1773 }
1774
1775 void obd_exports_barrier(struct obd_device *obd)
1776 {
1777         int waited = 2;
1778         LASSERT(list_empty(&obd->obd_exports));
1779         spin_lock(&obd->obd_dev_lock);
1780         while (!list_empty(&obd->obd_unlinked_exports)) {
1781                 spin_unlock(&obd->obd_dev_lock);
1782                 set_current_state(TASK_UNINTERRUPTIBLE);
1783                 schedule_timeout(cfs_time_seconds(waited));
1784                 if (waited > 5 && is_power_of_2(waited)) {
1785                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1786                                       "more than %d seconds. "
1787                                       "The obd refcount = %d. Is it stuck?\n",
1788                                       obd->obd_name, waited,
1789                                       atomic_read(&obd->obd_refcount));
1790                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1791                 }
1792                 waited *= 2;
1793                 spin_lock(&obd->obd_dev_lock);
1794         }
1795         spin_unlock(&obd->obd_dev_lock);
1796 }
1797 EXPORT_SYMBOL(obd_exports_barrier);
1798
1799 /**
1800  * Add export to the obd_zombe thread and notify it.
1801  */
1802 static void obd_zombie_export_add(struct obd_export *exp) {
1803         atomic_dec(&obd_stale_export_num);
1804         spin_lock(&exp->exp_obd->obd_dev_lock);
1805         LASSERT(!list_empty(&exp->exp_obd_chain));
1806         list_del_init(&exp->exp_obd_chain);
1807         spin_unlock(&exp->exp_obd->obd_dev_lock);
1808
1809         queue_work(zombie_wq, &exp->exp_zombie_work);
1810 }
1811
1812 /**
1813  * Add import to the obd_zombe thread and notify it.
1814  */
1815 static void obd_zombie_import_add(struct obd_import *imp) {
1816         LASSERT(imp->imp_sec == NULL);
1817
1818         queue_work(zombie_wq, &imp->imp_zombie_work);
1819 }
1820
1821 /**
1822  * wait when obd_zombie import/export queues become empty
1823  */
1824 void obd_zombie_barrier(void)
1825 {
1826         flush_workqueue(zombie_wq);
1827 }
1828 EXPORT_SYMBOL(obd_zombie_barrier);
1829
1830
1831 struct obd_export *obd_stale_export_get(void)
1832 {
1833         struct obd_export *exp = NULL;
1834         ENTRY;
1835
1836         spin_lock(&obd_stale_export_lock);
1837         if (!list_empty(&obd_stale_exports)) {
1838                 exp = list_entry(obd_stale_exports.next,
1839                                  struct obd_export, exp_stale_list);
1840                 list_del_init(&exp->exp_stale_list);
1841         }
1842         spin_unlock(&obd_stale_export_lock);
1843
1844         if (exp) {
1845                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1846                        atomic_read(&obd_stale_export_num));
1847         }
1848         RETURN(exp);
1849 }
1850 EXPORT_SYMBOL(obd_stale_export_get);
1851
1852 void obd_stale_export_put(struct obd_export *exp)
1853 {
1854         ENTRY;
1855
1856         LASSERT(list_empty(&exp->exp_stale_list));
1857         if (exp->exp_lock_hash &&
1858             atomic_read(&exp->exp_lock_hash->hs_count)) {
1859                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1860                        atomic_read(&obd_stale_export_num));
1861
1862                 spin_lock_bh(&exp->exp_bl_list_lock);
1863                 spin_lock(&obd_stale_export_lock);
1864                 /* Add to the tail if there is no blocked locks,
1865                  * to the head otherwise. */
1866                 if (list_empty(&exp->exp_bl_list))
1867                         list_add_tail(&exp->exp_stale_list,
1868                                       &obd_stale_exports);
1869                 else
1870                         list_add(&exp->exp_stale_list,
1871                                  &obd_stale_exports);
1872
1873                 spin_unlock(&obd_stale_export_lock);
1874                 spin_unlock_bh(&exp->exp_bl_list_lock);
1875         } else {
1876                 class_export_put(exp);
1877         }
1878         EXIT;
1879 }
1880 EXPORT_SYMBOL(obd_stale_export_put);
1881
1882 /**
1883  * Adjust the position of the export in the stale list,
1884  * i.e. move to the head of the list if is needed.
1885  **/
1886 void obd_stale_export_adjust(struct obd_export *exp)
1887 {
1888         LASSERT(exp != NULL);
1889         spin_lock_bh(&exp->exp_bl_list_lock);
1890         spin_lock(&obd_stale_export_lock);
1891
1892         if (!list_empty(&exp->exp_stale_list) &&
1893             !list_empty(&exp->exp_bl_list))
1894                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1895
1896         spin_unlock(&obd_stale_export_lock);
1897         spin_unlock_bh(&exp->exp_bl_list_lock);
1898 }
1899 EXPORT_SYMBOL(obd_stale_export_adjust);
1900
1901 /**
1902  * start destroy zombie import/export thread
1903  */
1904 int obd_zombie_impexp_init(void)
1905 {
1906         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1907         if (!zombie_wq)
1908                 return -ENOMEM;
1909
1910         return 0;
1911 }
1912
1913 /**
1914  * stop destroy zombie import/export thread
1915  */
1916 void obd_zombie_impexp_stop(void)
1917 {
1918         destroy_workqueue(zombie_wq);
1919         LASSERT(list_empty(&obd_stale_exports));
1920 }
1921
1922 /***** Kernel-userspace comm helpers *******/
1923
1924 /* Get length of entire message, including header */
1925 int kuc_len(int payload_len)
1926 {
1927         return sizeof(struct kuc_hdr) + payload_len;
1928 }
1929 EXPORT_SYMBOL(kuc_len);
1930
1931 /* Get a pointer to kuc header, given a ptr to the payload
1932  * @param p Pointer to payload area
1933  * @returns Pointer to kuc header
1934  */
1935 struct kuc_hdr * kuc_ptr(void *p)
1936 {
1937         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1938         LASSERT(lh->kuc_magic == KUC_MAGIC);
1939         return lh;
1940 }
1941 EXPORT_SYMBOL(kuc_ptr);
1942
1943 /* Alloc space for a message, and fill in header
1944  * @return Pointer to payload area
1945  */
1946 void *kuc_alloc(int payload_len, int transport, int type)
1947 {
1948         struct kuc_hdr *lh;
1949         int len = kuc_len(payload_len);
1950
1951         OBD_ALLOC(lh, len);
1952         if (lh == NULL)
1953                 return ERR_PTR(-ENOMEM);
1954
1955         lh->kuc_magic = KUC_MAGIC;
1956         lh->kuc_transport = transport;
1957         lh->kuc_msgtype = type;
1958         lh->kuc_msglen = len;
1959
1960         return (void *)(lh + 1);
1961 }
1962 EXPORT_SYMBOL(kuc_alloc);
1963
1964 /* Takes pointer to payload area */
1965 void kuc_free(void *p, int payload_len)
1966 {
1967         struct kuc_hdr *lh = kuc_ptr(p);
1968         OBD_FREE(lh, kuc_len(payload_len));
1969 }
1970 EXPORT_SYMBOL(kuc_free);
1971
1972 struct obd_request_slot_waiter {
1973         struct list_head        orsw_entry;
1974         wait_queue_head_t       orsw_waitq;
1975         bool                    orsw_signaled;
1976 };
1977
1978 static bool obd_request_slot_avail(struct client_obd *cli,
1979                                    struct obd_request_slot_waiter *orsw)
1980 {
1981         bool avail;
1982
1983         spin_lock(&cli->cl_loi_list_lock);
1984         avail = !!list_empty(&orsw->orsw_entry);
1985         spin_unlock(&cli->cl_loi_list_lock);
1986
1987         return avail;
1988 };
1989
1990 /*
1991  * For network flow control, the RPC sponsor needs to acquire a credit
1992  * before sending the RPC. The credits count for a connection is defined
1993  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1994  * the subsequent RPC sponsors need to wait until others released their
1995  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1996  */
1997 int obd_get_request_slot(struct client_obd *cli)
1998 {
1999         struct obd_request_slot_waiter   orsw;
2000         struct l_wait_info               lwi;
2001         int                              rc;
2002
2003         spin_lock(&cli->cl_loi_list_lock);
2004         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2005                 cli->cl_rpcs_in_flight++;
2006                 spin_unlock(&cli->cl_loi_list_lock);
2007                 return 0;
2008         }
2009
2010         init_waitqueue_head(&orsw.orsw_waitq);
2011         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2012         orsw.orsw_signaled = false;
2013         spin_unlock(&cli->cl_loi_list_lock);
2014
2015         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2016         rc = l_wait_event(orsw.orsw_waitq,
2017                           obd_request_slot_avail(cli, &orsw) ||
2018                           orsw.orsw_signaled,
2019                           &lwi);
2020
2021         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2022          * freed but other (such as obd_put_request_slot) is using it. */
2023         spin_lock(&cli->cl_loi_list_lock);
2024         if (rc != 0) {
2025                 if (!orsw.orsw_signaled) {
2026                         if (list_empty(&orsw.orsw_entry))
2027                                 cli->cl_rpcs_in_flight--;
2028                         else
2029                                 list_del(&orsw.orsw_entry);
2030                 }
2031         }
2032
2033         if (orsw.orsw_signaled) {
2034                 LASSERT(list_empty(&orsw.orsw_entry));
2035
2036                 rc = -EINTR;
2037         }
2038         spin_unlock(&cli->cl_loi_list_lock);
2039
2040         return rc;
2041 }
2042 EXPORT_SYMBOL(obd_get_request_slot);
2043
2044 void obd_put_request_slot(struct client_obd *cli)
2045 {
2046         struct obd_request_slot_waiter *orsw;
2047
2048         spin_lock(&cli->cl_loi_list_lock);
2049         cli->cl_rpcs_in_flight--;
2050
2051         /* If there is free slot, wakeup the first waiter. */
2052         if (!list_empty(&cli->cl_flight_waiters) &&
2053             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2054                 orsw = list_entry(cli->cl_flight_waiters.next,
2055                                   struct obd_request_slot_waiter, orsw_entry);
2056                 list_del_init(&orsw->orsw_entry);
2057                 cli->cl_rpcs_in_flight++;
2058                 wake_up(&orsw->orsw_waitq);
2059         }
2060         spin_unlock(&cli->cl_loi_list_lock);
2061 }
2062 EXPORT_SYMBOL(obd_put_request_slot);
2063
2064 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2065 {
2066         return cli->cl_max_rpcs_in_flight;
2067 }
2068 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2069
2070 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2071 {
2072         struct obd_request_slot_waiter *orsw;
2073         __u32                           old;
2074         int                             diff;
2075         int                             i;
2076         const char *type_name;
2077         int                             rc;
2078
2079         if (max > OBD_MAX_RIF_MAX || max < 1)
2080                 return -ERANGE;
2081
2082         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2083         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2084                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2085                  * strictly lower that max_rpcs_in_flight */
2086                 if (max < 2) {
2087                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2088                                "because it must be higher than "
2089                                "max_mod_rpcs_in_flight value",
2090                                cli->cl_import->imp_obd->obd_name);
2091                         return -ERANGE;
2092                 }
2093                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2094                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2095                         if (rc != 0)
2096                                 return rc;
2097                 }
2098         }
2099
2100         spin_lock(&cli->cl_loi_list_lock);
2101         old = cli->cl_max_rpcs_in_flight;
2102         cli->cl_max_rpcs_in_flight = max;
2103         client_adjust_max_dirty(cli);
2104
2105         diff = max - old;
2106
2107         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2108         for (i = 0; i < diff; i++) {
2109                 if (list_empty(&cli->cl_flight_waiters))
2110                         break;
2111
2112                 orsw = list_entry(cli->cl_flight_waiters.next,
2113                                   struct obd_request_slot_waiter, orsw_entry);
2114                 list_del_init(&orsw->orsw_entry);
2115                 cli->cl_rpcs_in_flight++;
2116                 wake_up(&orsw->orsw_waitq);
2117         }
2118         spin_unlock(&cli->cl_loi_list_lock);
2119
2120         return 0;
2121 }
2122 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2123
2124 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2125 {
2126         return cli->cl_max_mod_rpcs_in_flight;
2127 }
2128 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2129
2130 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2131 {
2132         struct obd_connect_data *ocd;
2133         __u16 maxmodrpcs;
2134         __u16 prev;
2135
2136         if (max > OBD_MAX_RIF_MAX || max < 1)
2137                 return -ERANGE;
2138
2139         /* cannot exceed or equal max_rpcs_in_flight */
2140         if (max >= cli->cl_max_rpcs_in_flight) {
2141                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2142                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2143                        cli->cl_import->imp_obd->obd_name,
2144                        max, cli->cl_max_rpcs_in_flight);
2145                 return -ERANGE;
2146         }
2147
2148         /* cannot exceed max modify RPCs in flight supported by the server */
2149         ocd = &cli->cl_import->imp_connect_data;
2150         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2151                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2152         else
2153                 maxmodrpcs = 1;
2154         if (max > maxmodrpcs) {
2155                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2156                        "higher than max_mod_rpcs_per_client value (%hu) "
2157                        "returned by the server at connection\n",
2158                        cli->cl_import->imp_obd->obd_name,
2159                        max, maxmodrpcs);
2160                 return -ERANGE;
2161         }
2162
2163         spin_lock(&cli->cl_mod_rpcs_lock);
2164
2165         prev = cli->cl_max_mod_rpcs_in_flight;
2166         cli->cl_max_mod_rpcs_in_flight = max;
2167
2168         /* wakeup waiters if limit has been increased */
2169         if (cli->cl_max_mod_rpcs_in_flight > prev)
2170                 wake_up(&cli->cl_mod_rpcs_waitq);
2171
2172         spin_unlock(&cli->cl_mod_rpcs_lock);
2173
2174         return 0;
2175 }
2176 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2177
2178 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2179                                struct seq_file *seq)
2180 {
2181         unsigned long mod_tot = 0, mod_cum;
2182         struct timespec64 now;
2183         int i;
2184
2185         ktime_get_real_ts64(&now);
2186
2187         spin_lock(&cli->cl_mod_rpcs_lock);
2188
2189         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2190                    (s64)now.tv_sec, now.tv_nsec);
2191         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2192                    cli->cl_mod_rpcs_in_flight);
2193
2194         seq_printf(seq, "\n\t\t\tmodify\n");
2195         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2196
2197         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2198
2199         mod_cum = 0;
2200         for (i = 0; i < OBD_HIST_MAX; i++) {
2201                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2202                 mod_cum += mod;
2203                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2204                            i, mod, pct(mod, mod_tot),
2205                            pct(mod_cum, mod_tot));
2206                 if (mod_cum == mod_tot)
2207                         break;
2208         }
2209
2210         spin_unlock(&cli->cl_mod_rpcs_lock);
2211
2212         return 0;
2213 }
2214 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2215
2216 /* The number of modify RPCs sent in parallel is limited
2217  * because the server has a finite number of slots per client to
2218  * store request result and ensure reply reconstruction when needed.
2219  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2220  * that takes into account server limit and cl_max_rpcs_in_flight
2221  * value.
2222  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2223  * one close request is allowed above the maximum.
2224  */
2225 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2226                                                  bool close_req)
2227 {
2228         bool avail;
2229
2230         /* A slot is available if
2231          * - number of modify RPCs in flight is less than the max
2232          * - it's a close RPC and no other close request is in flight
2233          */
2234         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2235                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2236
2237         return avail;
2238 }
2239
2240 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2241                                          bool close_req)
2242 {
2243         bool avail;
2244
2245         spin_lock(&cli->cl_mod_rpcs_lock);
2246         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2247         spin_unlock(&cli->cl_mod_rpcs_lock);
2248         return avail;
2249 }
2250
2251
2252 /* Get a modify RPC slot from the obd client @cli according
2253  * to the kind of operation @opc that is going to be sent
2254  * and the intent @it of the operation if it applies.
2255  * If the maximum number of modify RPCs in flight is reached
2256  * the thread is put to sleep.
2257  * Returns the tag to be set in the request message. Tag 0
2258  * is reserved for non-modifying requests.
2259  */
2260 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2261 {
2262         bool                    close_req = false;
2263         __u16                   i, max;
2264
2265         if (opc == MDS_CLOSE)
2266                 close_req = true;
2267
2268         do {
2269                 spin_lock(&cli->cl_mod_rpcs_lock);
2270                 max = cli->cl_max_mod_rpcs_in_flight;
2271                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2272                         /* there is a slot available */
2273                         cli->cl_mod_rpcs_in_flight++;
2274                         if (close_req)
2275                                 cli->cl_close_rpcs_in_flight++;
2276                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2277                                          cli->cl_mod_rpcs_in_flight);
2278                         /* find a free tag */
2279                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2280                                                 max + 1);
2281                         LASSERT(i < OBD_MAX_RIF_MAX);
2282                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2283                         spin_unlock(&cli->cl_mod_rpcs_lock);
2284                         /* tag 0 is reserved for non-modify RPCs */
2285
2286                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2287                                "opc %u, max %hu\n",
2288                                cli->cl_import->imp_obd->obd_name,
2289                                i + 1, opc, max);
2290
2291                         return i + 1;
2292                 }
2293                 spin_unlock(&cli->cl_mod_rpcs_lock);
2294
2295                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2296                        "opc %u, max %hu\n",
2297                        cli->cl_import->imp_obd->obd_name, opc, max);
2298
2299                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2300                                           obd_mod_rpc_slot_avail(cli,
2301                                                                  close_req));
2302         } while (true);
2303 }
2304 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2305
2306 /* Put a modify RPC slot from the obd client @cli according
2307  * to the kind of operation @opc that has been sent.
2308  */
2309 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2310 {
2311         bool                    close_req = false;
2312
2313         if (tag == 0)
2314                 return;
2315
2316         if (opc == MDS_CLOSE)
2317                 close_req = true;
2318
2319         spin_lock(&cli->cl_mod_rpcs_lock);
2320         cli->cl_mod_rpcs_in_flight--;
2321         if (close_req)
2322                 cli->cl_close_rpcs_in_flight--;
2323         /* release the tag in the bitmap */
2324         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2325         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2326         spin_unlock(&cli->cl_mod_rpcs_lock);
2327         wake_up(&cli->cl_mod_rpcs_waitq);
2328 }
2329 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2330