Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         module_put(type->typ_dt_ops->o_owner);
154         atomic_dec(&type->typ_refcnt);
155 }
156
157 static void class_sysfs_release(struct kobject *kobj)
158 {
159         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
160
161         debugfs_remove_recursive(type->typ_debugfs_entry);
162         type->typ_debugfs_entry = NULL;
163
164         if (type->typ_lu)
165                 lu_device_type_fini(type->typ_lu);
166
167 #ifdef CONFIG_PROC_FS
168         if (type->typ_name && type->typ_procroot)
169                 remove_proc_subtree(type->typ_name, proc_lustre_root);
170 #endif
171         OBD_FREE(type, sizeof(*type));
172 }
173
174 static struct kobj_type class_ktype = {
175         .sysfs_ops      = &lustre_sysfs_ops,
176         .release        = class_sysfs_release,
177 };
178
179 #ifdef HAVE_SERVER_SUPPORT
180 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
181 {
182         struct dentry *symlink;
183         struct obd_type *type;
184         int rc;
185
186         type = class_search_type(name);
187         if (type) {
188                 kobject_put(&type->typ_kobj);
189                 return ERR_PTR(-EEXIST);
190         }
191
192         OBD_ALLOC(type, sizeof(*type));
193         if (!type)
194                 return ERR_PTR(-ENOMEM);
195
196         type->typ_kobj.kset = lustre_kset;
197         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
198                                   &lustre_kset->kobj, "%s", name);
199         if (rc)
200                 return ERR_PTR(rc);
201
202         symlink = debugfs_create_dir(name, debugfs_lustre_root);
203         type->typ_debugfs_entry = symlink;
204         type->typ_sym_filter = true;
205
206         if (enable_proc) {
207                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
208                                                       NULL, NULL);
209                 if (IS_ERR(type->typ_procroot)) {
210                         CERROR("%s: can't create compat proc entry: %d\n",
211                                name, (int)PTR_ERR(type->typ_procroot));
212                         type->typ_procroot = NULL;
213                 }
214         }
215
216         return type;
217 }
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
220
221 #define CLASS_MAX_NAME 1024
222
223 int class_register_type(const struct obd_ops *dt_ops,
224                         const struct md_ops *md_ops,
225                         bool enable_proc,
226                         const char *name, struct lu_device_type *ldt)
227 {
228         struct obd_type *type;
229         int rc;
230
231         ENTRY;
232         /* sanity check */
233         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
234
235         type = class_search_type(name);
236         if (type) {
237 #ifdef HAVE_SERVER_SUPPORT
238                 if (type->typ_sym_filter)
239                         goto dir_exist;
240 #endif /* HAVE_SERVER_SUPPORT */
241                 kobject_put(&type->typ_kobj);
242                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
243                 RETURN(-EEXIST);
244         }
245
246         OBD_ALLOC(type, sizeof(*type));
247         if (type == NULL)
248                 RETURN(-ENOMEM);
249
250         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
251         type->typ_kobj.kset = lustre_kset;
252         kobject_init(&type->typ_kobj, &class_ktype);
253 #ifdef HAVE_SERVER_SUPPORT
254 dir_exist:
255 #endif /* HAVE_SERVER_SUPPORT */
256
257         type->typ_dt_ops = dt_ops;
258         type->typ_md_ops = md_ops;
259
260 #ifdef HAVE_SERVER_SUPPORT
261         if (type->typ_sym_filter) {
262                 type->typ_sym_filter = false;
263                 kobject_put(&type->typ_kobj);
264                 goto setup_ldt;
265         }
266 #endif
267 #ifdef CONFIG_PROC_FS
268         if (enable_proc && !type->typ_procroot) {
269                 type->typ_procroot = lprocfs_register(name,
270                                                       proc_lustre_root,
271                                                       NULL, type);
272                 if (IS_ERR(type->typ_procroot)) {
273                         rc = PTR_ERR(type->typ_procroot);
274                         type->typ_procroot = NULL;
275                         GOTO(failed, rc);
276                 }
277         }
278 #endif
279         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
280
281         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
282         if (rc)
283                 GOTO(failed, rc);
284 #ifdef HAVE_SERVER_SUPPORT
285 setup_ldt:
286 #endif
287         if (ldt) {
288                 rc = lu_device_type_init(ldt);
289                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
290                 wake_up_var(&type->typ_lu);
291                 if (rc)
292                         GOTO(failed, rc);
293         }
294
295         RETURN(0);
296
297 failed:
298         kobject_put(&type->typ_kobj);
299
300         RETURN(rc);
301 }
302 EXPORT_SYMBOL(class_register_type);
303
304 int class_unregister_type(const char *name)
305 {
306         struct obd_type *type = class_search_type(name);
307         int rc = 0;
308         ENTRY;
309
310         if (!type) {
311                 CERROR("unknown obd type\n");
312                 RETURN(-EINVAL);
313         }
314
315         if (atomic_read(&type->typ_refcnt)) {
316                 CERROR("type %s has refcount (%d)\n", name,
317                        atomic_read(&type->typ_refcnt));
318                 /* This is a bad situation, let's make the best of it */
319                 /* Remove ops, but leave the name for debugging */
320                 type->typ_dt_ops = NULL;
321                 type->typ_md_ops = NULL;
322                 GOTO(out_put, rc = -EBUSY);
323         }
324
325         /* Put the final ref */
326         kobject_put(&type->typ_kobj);
327 out_put:
328         /* Put the ref returned by class_search_type() */
329         kobject_put(&type->typ_kobj);
330
331         RETURN(rc);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
334
335 /**
336  * Create a new obd device.
337  *
338  * Allocate the new obd_device and initialize it.
339  *
340  * \param[in] type_name obd device type string.
341  * \param[in] name      obd device name.
342  * \param[in] uuid      obd device UUID
343  *
344  * \retval newdev         pointer to created obd_device
345  * \retval ERR_PTR(errno) on error
346  */
347 struct obd_device *class_newdev(const char *type_name, const char *name,
348                                 const char *uuid)
349 {
350         struct obd_device *newdev;
351         struct obd_type *type = NULL;
352         ENTRY;
353
354         if (strlen(name) >= MAX_OBD_NAME) {
355                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
356                 RETURN(ERR_PTR(-EINVAL));
357         }
358
359         type = class_get_type(type_name);
360         if (type == NULL){
361                 CERROR("OBD: unknown type: %s\n", type_name);
362                 RETURN(ERR_PTR(-ENODEV));
363         }
364
365         newdev = obd_device_alloc();
366         if (newdev == NULL) {
367                 class_put_type(type);
368                 RETURN(ERR_PTR(-ENOMEM));
369         }
370         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
371         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
372         newdev->obd_type = type;
373         newdev->obd_minor = -1;
374
375         rwlock_init(&newdev->obd_pool_lock);
376         newdev->obd_pool_limit = 0;
377         newdev->obd_pool_slv = 0;
378
379         INIT_LIST_HEAD(&newdev->obd_exports);
380         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
381         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
382         INIT_LIST_HEAD(&newdev->obd_exports_timed);
383         INIT_LIST_HEAD(&newdev->obd_nid_stats);
384         spin_lock_init(&newdev->obd_nid_lock);
385         spin_lock_init(&newdev->obd_dev_lock);
386         mutex_init(&newdev->obd_dev_mutex);
387         spin_lock_init(&newdev->obd_osfs_lock);
388         /* newdev->obd_osfs_age must be set to a value in the distant
389          * past to guarantee a fresh statfs is fetched on mount. */
390         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
391
392         /* XXX belongs in setup not attach  */
393         init_rwsem(&newdev->obd_observer_link_sem);
394         /* recovery data */
395         spin_lock_init(&newdev->obd_recovery_task_lock);
396         init_waitqueue_head(&newdev->obd_next_transno_waitq);
397         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
398         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
399         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
400         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
401         INIT_LIST_HEAD(&newdev->obd_evict_list);
402         INIT_LIST_HEAD(&newdev->obd_lwp_list);
403
404         llog_group_init(&newdev->obd_olg);
405         /* Detach drops this */
406         atomic_set(&newdev->obd_refcount, 1);
407         lu_ref_init(&newdev->obd_reference);
408         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
409
410         newdev->obd_conn_inprogress = 0;
411
412         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
413
414         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
415                newdev->obd_name, newdev);
416
417         return newdev;
418 }
419
420 /**
421  * Free obd device.
422  *
423  * \param[in] obd obd_device to be freed
424  *
425  * \retval none
426  */
427 void class_free_dev(struct obd_device *obd)
428 {
429         struct obd_type *obd_type = obd->obd_type;
430
431         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
432                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
433         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
434                  "obd %p != obd_devs[%d] %p\n",
435                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
436         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
437                  "obd_refcount should be 0, not %d\n",
438                  atomic_read(&obd->obd_refcount));
439         LASSERT(obd_type != NULL);
440
441         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
442                obd->obd_name, obd->obd_type->typ_name);
443
444         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
445                          obd->obd_name, obd->obd_uuid.uuid);
446         if (obd->obd_stopping) {
447                 int err;
448
449                 /* If we're not stopping, we were never set up */
450                 err = obd_cleanup(obd);
451                 if (err)
452                         CERROR("Cleanup %s returned %d\n",
453                                 obd->obd_name, err);
454         }
455
456         obd_device_free(obd);
457
458         class_put_type(obd_type);
459 }
460
461 /**
462  * Unregister obd device.
463  *
464  * Free slot in obd_dev[] used by \a obd.
465  *
466  * \param[in] new_obd obd_device to be unregistered
467  *
468  * \retval none
469  */
470 void class_unregister_device(struct obd_device *obd)
471 {
472         write_lock(&obd_dev_lock);
473         if (obd->obd_minor >= 0) {
474                 LASSERT(obd_devs[obd->obd_minor] == obd);
475                 obd_devs[obd->obd_minor] = NULL;
476                 obd->obd_minor = -1;
477         }
478         write_unlock(&obd_dev_lock);
479 }
480
481 /**
482  * Register obd device.
483  *
484  * Find free slot in obd_devs[], fills it with \a new_obd.
485  *
486  * \param[in] new_obd obd_device to be registered
487  *
488  * \retval 0          success
489  * \retval -EEXIST    device with this name is registered
490  * \retval -EOVERFLOW obd_devs[] is full
491  */
492 int class_register_device(struct obd_device *new_obd)
493 {
494         int ret = 0;
495         int i;
496         int new_obd_minor = 0;
497         bool minor_assign = false;
498         bool retried = false;
499
500 again:
501         write_lock(&obd_dev_lock);
502         for (i = 0; i < class_devno_max(); i++) {
503                 struct obd_device *obd = class_num2obd(i);
504
505                 if (obd != NULL &&
506                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
507
508                         if (!retried) {
509                                 write_unlock(&obd_dev_lock);
510
511                                 /* the obd_device could be waited to be
512                                  * destroyed by the "obd_zombie_impexp_thread".
513                                  */
514                                 obd_zombie_barrier();
515                                 retried = true;
516                                 goto again;
517                         }
518
519                         CERROR("%s: already exists, won't add\n",
520                                obd->obd_name);
521                         /* in case we found a free slot before duplicate */
522                         minor_assign = false;
523                         ret = -EEXIST;
524                         break;
525                 }
526                 if (!minor_assign && obd == NULL) {
527                         new_obd_minor = i;
528                         minor_assign = true;
529                 }
530         }
531
532         if (minor_assign) {
533                 new_obd->obd_minor = new_obd_minor;
534                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
535                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
536                 obd_devs[new_obd_minor] = new_obd;
537         } else {
538                 if (ret == 0) {
539                         ret = -EOVERFLOW;
540                         CERROR("%s: all %u/%u devices used, increase "
541                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
542                                i, class_devno_max(), ret);
543                 }
544         }
545         write_unlock(&obd_dev_lock);
546
547         RETURN(ret);
548 }
549
550 static int class_name2dev_nolock(const char *name)
551 {
552         int i;
553
554         if (!name)
555                 return -1;
556
557         for (i = 0; i < class_devno_max(); i++) {
558                 struct obd_device *obd = class_num2obd(i);
559
560                 if (obd && strcmp(name, obd->obd_name) == 0) {
561                         /* Make sure we finished attaching before we give
562                            out any references */
563                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
564                         if (obd->obd_attached) {
565                                 return i;
566                         }
567                         break;
568                 }
569         }
570
571         return -1;
572 }
573
574 int class_name2dev(const char *name)
575 {
576         int i;
577
578         if (!name)
579                 return -1;
580
581         read_lock(&obd_dev_lock);
582         i = class_name2dev_nolock(name);
583         read_unlock(&obd_dev_lock);
584
585         return i;
586 }
587 EXPORT_SYMBOL(class_name2dev);
588
589 struct obd_device *class_name2obd(const char *name)
590 {
591         int dev = class_name2dev(name);
592
593         if (dev < 0 || dev > class_devno_max())
594                 return NULL;
595         return class_num2obd(dev);
596 }
597 EXPORT_SYMBOL(class_name2obd);
598
599 int class_uuid2dev_nolock(struct obd_uuid *uuid)
600 {
601         int i;
602
603         for (i = 0; i < class_devno_max(); i++) {
604                 struct obd_device *obd = class_num2obd(i);
605
606                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
607                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
608                         return i;
609                 }
610         }
611
612         return -1;
613 }
614
615 int class_uuid2dev(struct obd_uuid *uuid)
616 {
617         int i;
618
619         read_lock(&obd_dev_lock);
620         i = class_uuid2dev_nolock(uuid);
621         read_unlock(&obd_dev_lock);
622
623         return i;
624 }
625 EXPORT_SYMBOL(class_uuid2dev);
626
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
628 {
629         int dev = class_uuid2dev(uuid);
630         if (dev < 0)
631                 return NULL;
632         return class_num2obd(dev);
633 }
634 EXPORT_SYMBOL(class_uuid2obd);
635
636 /**
637  * Get obd device from ::obd_devs[]
638  *
639  * \param num [in] array index
640  *
641  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
642  *         otherwise return the obd device there.
643  */
644 struct obd_device *class_num2obd(int num)
645 {
646         struct obd_device *obd = NULL;
647
648         if (num < class_devno_max()) {
649                 obd = obd_devs[num];
650                 if (obd == NULL)
651                         return NULL;
652
653                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
654                          "%p obd_magic %08x != %08x\n",
655                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
656                 LASSERTF(obd->obd_minor == num,
657                          "%p obd_minor %0d != %0d\n",
658                          obd, obd->obd_minor, num);
659         }
660
661         return obd;
662 }
663 EXPORT_SYMBOL(class_num2obd);
664
665 /**
666  * Find obd in obd_dev[] by name or uuid.
667  *
668  * Increment obd's refcount if found.
669  *
670  * \param[in] str obd name or uuid
671  *
672  * \retval NULL    if not found
673  * \retval target  pointer to found obd_device
674  */
675 struct obd_device *class_dev_by_str(const char *str)
676 {
677         struct obd_device *target = NULL;
678         struct obd_uuid tgtuuid;
679         int rc;
680
681         obd_str2uuid(&tgtuuid, str);
682
683         read_lock(&obd_dev_lock);
684         rc = class_uuid2dev_nolock(&tgtuuid);
685         if (rc < 0)
686                 rc = class_name2dev_nolock(str);
687
688         if (rc >= 0)
689                 target = class_num2obd(rc);
690
691         if (target != NULL)
692                 class_incref(target, "find", current);
693         read_unlock(&obd_dev_lock);
694
695         RETURN(target);
696 }
697 EXPORT_SYMBOL(class_dev_by_str);
698
699 /**
700  * Get obd devices count. Device in any
701  *    state are counted
702  * \retval obd device count
703  */
704 int get_devices_count(void)
705 {
706         int index, max_index = class_devno_max(), dev_count = 0;
707
708         read_lock(&obd_dev_lock);
709         for (index = 0; index <= max_index; index++) {
710                 struct obd_device *obd = class_num2obd(index);
711                 if (obd != NULL)
712                         dev_count++;
713         }
714         read_unlock(&obd_dev_lock);
715
716         return dev_count;
717 }
718 EXPORT_SYMBOL(get_devices_count);
719
720 void class_obd_list(void)
721 {
722         char *status;
723         int i;
724
725         read_lock(&obd_dev_lock);
726         for (i = 0; i < class_devno_max(); i++) {
727                 struct obd_device *obd = class_num2obd(i);
728
729                 if (obd == NULL)
730                         continue;
731                 if (obd->obd_stopping)
732                         status = "ST";
733                 else if (obd->obd_set_up)
734                         status = "UP";
735                 else if (obd->obd_attached)
736                         status = "AT";
737                 else
738                         status = "--";
739                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
740                          i, status, obd->obd_type->typ_name,
741                          obd->obd_name, obd->obd_uuid.uuid,
742                          atomic_read(&obd->obd_refcount));
743         }
744         read_unlock(&obd_dev_lock);
745 }
746
747 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
748  * specified, then only the client with that uuid is returned,
749  * otherwise any client connected to the tgt is returned.
750  */
751 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
752                                          const char *type_name,
753                                          struct obd_uuid *grp_uuid)
754 {
755         int i;
756
757         read_lock(&obd_dev_lock);
758         for (i = 0; i < class_devno_max(); i++) {
759                 struct obd_device *obd = class_num2obd(i);
760
761                 if (obd == NULL)
762                         continue;
763                 if ((strncmp(obd->obd_type->typ_name, type_name,
764                              strlen(type_name)) == 0)) {
765                         if (obd_uuid_equals(tgt_uuid,
766                                             &obd->u.cli.cl_target_uuid) &&
767                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
768                                                          &obd->obd_uuid) : 1)) {
769                                 read_unlock(&obd_dev_lock);
770                                 return obd;
771                         }
772                 }
773         }
774         read_unlock(&obd_dev_lock);
775
776         return NULL;
777 }
778 EXPORT_SYMBOL(class_find_client_obd);
779
780 /* Iterate the obd_device list looking devices have grp_uuid. Start
781  * searching at *next, and if a device is found, the next index to look
782  * at is saved in *next. If next is NULL, then the first matching device
783  * will always be returned.
784  */
785 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
786 {
787         int i;
788
789         if (next == NULL)
790                 i = 0;
791         else if (*next >= 0 && *next < class_devno_max())
792                 i = *next;
793         else
794                 return NULL;
795
796         read_lock(&obd_dev_lock);
797         for (; i < class_devno_max(); i++) {
798                 struct obd_device *obd = class_num2obd(i);
799
800                 if (obd == NULL)
801                         continue;
802                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
803                         if (next != NULL)
804                                 *next = i+1;
805                         read_unlock(&obd_dev_lock);
806                         return obd;
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_devices_in_group);
814
815 /**
816  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
817  * adjust sptlrpc settings accordingly.
818  */
819 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
820 {
821         struct obd_device  *obd;
822         const char         *type;
823         int                 i, rc = 0, rc2;
824
825         LASSERT(namelen > 0);
826
827         read_lock(&obd_dev_lock);
828         for (i = 0; i < class_devno_max(); i++) {
829                 obd = class_num2obd(i);
830
831                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
832                         continue;
833
834                 /* only notify mdc, osc, osp, lwp, mdt, ost
835                  * because only these have a -sptlrpc llog */
836                 type = obd->obd_type->typ_name;
837                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
838                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
839                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
840                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
841                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OST_NAME) != 0)
843                         continue;
844
845                 if (strncmp(obd->obd_name, fsname, namelen))
846                         continue;
847
848                 class_incref(obd, __FUNCTION__, obd);
849                 read_unlock(&obd_dev_lock);
850                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
851                                          sizeof(KEY_SPTLRPC_CONF),
852                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
853                 rc = rc ? rc : rc2;
854                 class_decref(obd, __FUNCTION__, obd);
855                 read_lock(&obd_dev_lock);
856         }
857         read_unlock(&obd_dev_lock);
858         return rc;
859 }
860 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
861
862 void obd_cleanup_caches(void)
863 {
864         ENTRY;
865         if (obd_device_cachep) {
866                 kmem_cache_destroy(obd_device_cachep);
867                 obd_device_cachep = NULL;
868         }
869
870         EXIT;
871 }
872
873 int obd_init_caches(void)
874 {
875         int rc;
876         ENTRY;
877
878         LASSERT(obd_device_cachep == NULL);
879         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
880                                 sizeof(struct obd_device),
881                                 0, 0, 0, sizeof(struct obd_device), NULL);
882         if (!obd_device_cachep)
883                 GOTO(out, rc = -ENOMEM);
884
885         RETURN(0);
886 out:
887         obd_cleanup_caches();
888         RETURN(rc);
889 }
890
891 static const char export_handle_owner[] = "export";
892
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
895 {
896         struct obd_export *export;
897         ENTRY;
898
899         if (!conn) {
900                 CDEBUG(D_CACHE, "looking for null handle\n");
901                 RETURN(NULL);
902         }
903
904         if (conn->cookie == -1) {  /* this means assign a new connection */
905                 CDEBUG(D_CACHE, "want a new connection\n");
906                 RETURN(NULL);
907         }
908
909         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910         export = class_handle2object(conn->cookie, export_handle_owner);
911         RETURN(export);
912 }
913 EXPORT_SYMBOL(class_conn2export);
914
915 struct obd_device *class_exp2obd(struct obd_export *exp)
916 {
917         if (exp)
918                 return exp->exp_obd;
919         return NULL;
920 }
921 EXPORT_SYMBOL(class_exp2obd);
922
923 struct obd_import *class_exp2cliimp(struct obd_export *exp)
924 {
925         struct obd_device *obd = exp->exp_obd;
926         if (obd == NULL)
927                 return NULL;
928         return obd->u.cli.cl_import;
929 }
930 EXPORT_SYMBOL(class_exp2cliimp);
931
932 /* Export management functions */
933 static void class_export_destroy(struct obd_export *exp)
934 {
935         struct obd_device *obd = exp->exp_obd;
936         ENTRY;
937
938         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
939         LASSERT(obd != NULL);
940
941         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
942                exp->exp_client_uuid.uuid, obd->obd_name);
943
944         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
945         ptlrpc_connection_put(exp->exp_connection);
946
947         LASSERT(list_empty(&exp->exp_outstanding_replies));
948         LASSERT(list_empty(&exp->exp_uncommitted_replies));
949         LASSERT(list_empty(&exp->exp_req_replay_queue));
950         LASSERT(list_empty(&exp->exp_hp_rpcs));
951         obd_destroy_export(exp);
952         /* self export doesn't hold a reference to an obd, although it
953          * exists until freeing of the obd */
954         if (exp != obd->obd_self_export)
955                 class_decref(obd, "export", exp);
956
957         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
958         kfree_rcu(exp, exp_handle.h_rcu);
959         EXIT;
960 }
961
962 struct obd_export *class_export_get(struct obd_export *exp)
963 {
964         refcount_inc(&exp->exp_handle.h_ref);
965         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
966                refcount_read(&exp->exp_handle.h_ref));
967         return exp;
968 }
969 EXPORT_SYMBOL(class_export_get);
970
971 void class_export_put(struct obd_export *exp)
972 {
973         LASSERT(exp != NULL);
974         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
975         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
976         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
977                refcount_read(&exp->exp_handle.h_ref) - 1);
978
979         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
980                 struct obd_device *obd = exp->exp_obd;
981
982                 CDEBUG(D_IOCTL, "final put %p/%s\n",
983                        exp, exp->exp_client_uuid.uuid);
984
985                 /* release nid stat refererence */
986                 lprocfs_exp_cleanup(exp);
987
988                 if (exp == obd->obd_self_export) {
989                         /* self export should be destroyed without
990                          * zombie thread as it doesn't hold a
991                          * reference to obd and doesn't hold any
992                          * resources */
993                         class_export_destroy(exp);
994                         /* self export is destroyed, no class
995                          * references exist and it is safe to free
996                          * obd */
997                         class_free_dev(obd);
998                 } else {
999                         LASSERT(!list_empty(&exp->exp_obd_chain));
1000                         obd_zombie_export_add(exp);
1001                 }
1002
1003         }
1004 }
1005 EXPORT_SYMBOL(class_export_put);
1006
1007 static void obd_zombie_exp_cull(struct work_struct *ws)
1008 {
1009         struct obd_export *export;
1010
1011         export = container_of(ws, struct obd_export, exp_zombie_work);
1012         class_export_destroy(export);
1013 }
1014
1015 /* Creates a new export, adds it to the hash table, and returns a
1016  * pointer to it. The refcount is 2: one for the hash reference, and
1017  * one for the pointer returned by this function. */
1018 struct obd_export *__class_new_export(struct obd_device *obd,
1019                                       struct obd_uuid *cluuid, bool is_self)
1020 {
1021         struct obd_export *export;
1022         int rc = 0;
1023         ENTRY;
1024
1025         OBD_ALLOC_PTR(export);
1026         if (!export)
1027                 return ERR_PTR(-ENOMEM);
1028
1029         export->exp_conn_cnt = 0;
1030         export->exp_lock_hash = NULL;
1031         export->exp_flock_hash = NULL;
1032         /* 2 = class_handle_hash + last */
1033         refcount_set(&export->exp_handle.h_ref, 2);
1034         atomic_set(&export->exp_rpc_count, 0);
1035         atomic_set(&export->exp_cb_count, 0);
1036         atomic_set(&export->exp_locks_count, 0);
1037 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1038         INIT_LIST_HEAD(&export->exp_locks_list);
1039         spin_lock_init(&export->exp_locks_list_guard);
1040 #endif
1041         atomic_set(&export->exp_replay_count, 0);
1042         export->exp_obd = obd;
1043         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1044         spin_lock_init(&export->exp_uncommitted_replies_lock);
1045         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1046         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1047         INIT_HLIST_NODE(&export->exp_handle.h_link);
1048         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1049         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1050         class_handle_hash(&export->exp_handle, export_handle_owner);
1051         export->exp_last_request_time = ktime_get_real_seconds();
1052         spin_lock_init(&export->exp_lock);
1053         spin_lock_init(&export->exp_rpc_lock);
1054         INIT_HLIST_NODE(&export->exp_gen_hash);
1055         spin_lock_init(&export->exp_bl_list_lock);
1056         INIT_LIST_HEAD(&export->exp_bl_list);
1057         INIT_LIST_HEAD(&export->exp_stale_list);
1058         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1059
1060         export->exp_sp_peer = LUSTRE_SP_ANY;
1061         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1062         export->exp_client_uuid = *cluuid;
1063         obd_init_export(export);
1064
1065         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1066
1067         spin_lock(&obd->obd_dev_lock);
1068         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1069                 /* shouldn't happen, but might race */
1070                 if (obd->obd_stopping)
1071                         GOTO(exit_unlock, rc = -ENODEV);
1072
1073                 rc = obd_uuid_add(obd, export);
1074                 if (rc != 0) {
1075                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1076                                       obd->obd_name, cluuid->uuid, rc);
1077                         GOTO(exit_unlock, rc = -EALREADY);
1078                 }
1079         }
1080
1081         if (!is_self) {
1082                 class_incref(obd, "export", export);
1083                 list_add_tail(&export->exp_obd_chain_timed,
1084                               &obd->obd_exports_timed);
1085                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1086                 obd->obd_num_exports++;
1087         } else {
1088                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1089                 INIT_LIST_HEAD(&export->exp_obd_chain);
1090         }
1091         spin_unlock(&obd->obd_dev_lock);
1092         RETURN(export);
1093
1094 exit_unlock:
1095         spin_unlock(&obd->obd_dev_lock);
1096         class_handle_unhash(&export->exp_handle);
1097         obd_destroy_export(export);
1098         OBD_FREE_PTR(export);
1099         return ERR_PTR(rc);
1100 }
1101
1102 struct obd_export *class_new_export(struct obd_device *obd,
1103                                     struct obd_uuid *uuid)
1104 {
1105         return __class_new_export(obd, uuid, false);
1106 }
1107 EXPORT_SYMBOL(class_new_export);
1108
1109 struct obd_export *class_new_export_self(struct obd_device *obd,
1110                                          struct obd_uuid *uuid)
1111 {
1112         return __class_new_export(obd, uuid, true);
1113 }
1114
1115 void class_unlink_export(struct obd_export *exp)
1116 {
1117         class_handle_unhash(&exp->exp_handle);
1118
1119         if (exp->exp_obd->obd_self_export == exp) {
1120                 class_export_put(exp);
1121                 return;
1122         }
1123
1124         spin_lock(&exp->exp_obd->obd_dev_lock);
1125         /* delete an uuid-export hashitem from hashtables */
1126         if (exp != exp->exp_obd->obd_self_export)
1127                 obd_uuid_del(exp->exp_obd, exp);
1128
1129 #ifdef HAVE_SERVER_SUPPORT
1130         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1131                 struct tg_export_data   *ted = &exp->exp_target_data;
1132                 struct cfs_hash         *hash;
1133
1134                 /* Because obd_gen_hash will not be released until
1135                  * class_cleanup(), so hash should never be NULL here */
1136                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1137                 LASSERT(hash != NULL);
1138                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1139                              &exp->exp_gen_hash);
1140                 cfs_hash_putref(hash);
1141         }
1142 #endif /* HAVE_SERVER_SUPPORT */
1143
1144         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1145         list_del_init(&exp->exp_obd_chain_timed);
1146         exp->exp_obd->obd_num_exports--;
1147         spin_unlock(&exp->exp_obd->obd_dev_lock);
1148         atomic_inc(&obd_stale_export_num);
1149
1150         /* A reference is kept by obd_stale_exports list */
1151         obd_stale_export_put(exp);
1152 }
1153 EXPORT_SYMBOL(class_unlink_export);
1154
1155 /* Import management functions */
1156 static void obd_zombie_import_free(struct obd_import *imp)
1157 {
1158         ENTRY;
1159
1160         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1161                imp->imp_obd->obd_name);
1162
1163         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1164
1165         ptlrpc_connection_put(imp->imp_connection);
1166
1167         while (!list_empty(&imp->imp_conn_list)) {
1168                 struct obd_import_conn *imp_conn;
1169
1170                 imp_conn = list_first_entry(&imp->imp_conn_list,
1171                                             struct obd_import_conn, oic_item);
1172                 list_del_init(&imp_conn->oic_item);
1173                 ptlrpc_connection_put(imp_conn->oic_conn);
1174                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1175         }
1176
1177         LASSERT(imp->imp_sec == NULL);
1178         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1179                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1180         class_decref(imp->imp_obd, "import", imp);
1181         OBD_FREE_PTR(imp);
1182         EXIT;
1183 }
1184
1185 struct obd_import *class_import_get(struct obd_import *import)
1186 {
1187         refcount_inc(&import->imp_refcount);
1188         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1189                refcount_read(&import->imp_refcount),
1190                import->imp_obd->obd_name);
1191         return import;
1192 }
1193 EXPORT_SYMBOL(class_import_get);
1194
1195 void class_import_put(struct obd_import *imp)
1196 {
1197         ENTRY;
1198
1199         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1200
1201         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1202                refcount_read(&imp->imp_refcount) - 1,
1203                imp->imp_obd->obd_name);
1204
1205         if (refcount_dec_and_test(&imp->imp_refcount)) {
1206                 CDEBUG(D_INFO, "final put import %p\n", imp);
1207                 obd_zombie_import_add(imp);
1208         }
1209
1210         EXIT;
1211 }
1212 EXPORT_SYMBOL(class_import_put);
1213
1214 static void init_imp_at(struct imp_at *at) {
1215         int i;
1216         at_init(&at->iat_net_latency, 0, 0);
1217         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1218                 /* max service estimates are tracked on the server side, so
1219                    don't use the AT history here, just use the last reported
1220                    val. (But keep hist for proc histogram, worst_ever) */
1221                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1222                         AT_FLG_NOHIST);
1223         }
1224 }
1225
1226 static void obd_zombie_imp_cull(struct work_struct *ws)
1227 {
1228         struct obd_import *import;
1229
1230         import = container_of(ws, struct obd_import, imp_zombie_work);
1231         obd_zombie_import_free(import);
1232 }
1233
1234 struct obd_import *class_new_import(struct obd_device *obd)
1235 {
1236         struct obd_import *imp;
1237         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1238
1239         OBD_ALLOC(imp, sizeof(*imp));
1240         if (imp == NULL)
1241                 return NULL;
1242
1243         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1244         INIT_LIST_HEAD(&imp->imp_replay_list);
1245         INIT_LIST_HEAD(&imp->imp_sending_list);
1246         INIT_LIST_HEAD(&imp->imp_delayed_list);
1247         INIT_LIST_HEAD(&imp->imp_committed_list);
1248         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1249         imp->imp_known_replied_xid = 0;
1250         imp->imp_replay_cursor = &imp->imp_committed_list;
1251         spin_lock_init(&imp->imp_lock);
1252         imp->imp_last_success_conn = 0;
1253         imp->imp_state = LUSTRE_IMP_NEW;
1254         imp->imp_obd = class_incref(obd, "import", imp);
1255         rwlock_init(&imp->imp_sec_lock);
1256         init_waitqueue_head(&imp->imp_recovery_waitq);
1257         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1258
1259         if (curr_pid_ns && curr_pid_ns->child_reaper)
1260                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1261         else
1262                 imp->imp_sec_refpid = 1;
1263
1264         refcount_set(&imp->imp_refcount, 2);
1265         atomic_set(&imp->imp_unregistering, 0);
1266         atomic_set(&imp->imp_reqs, 0);
1267         atomic_set(&imp->imp_inflight, 0);
1268         atomic_set(&imp->imp_replay_inflight, 0);
1269         init_waitqueue_head(&imp->imp_replay_waitq);
1270         atomic_set(&imp->imp_inval_count, 0);
1271         INIT_LIST_HEAD(&imp->imp_conn_list);
1272         init_imp_at(&imp->imp_at);
1273
1274         /* the default magic is V2, will be used in connect RPC, and
1275          * then adjusted according to the flags in request/reply. */
1276         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1277
1278         return imp;
1279 }
1280 EXPORT_SYMBOL(class_new_import);
1281
1282 void class_destroy_import(struct obd_import *import)
1283 {
1284         LASSERT(import != NULL);
1285         LASSERT(import != LP_POISON);
1286
1287         spin_lock(&import->imp_lock);
1288         import->imp_generation++;
1289         spin_unlock(&import->imp_lock);
1290         class_import_put(import);
1291 }
1292 EXPORT_SYMBOL(class_destroy_import);
1293
1294 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1295
1296 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1297 {
1298         spin_lock(&exp->exp_locks_list_guard);
1299
1300         LASSERT(lock->l_exp_refs_nr >= 0);
1301
1302         if (lock->l_exp_refs_target != NULL &&
1303             lock->l_exp_refs_target != exp) {
1304                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1305                               exp, lock, lock->l_exp_refs_target);
1306         }
1307         if ((lock->l_exp_refs_nr ++) == 0) {
1308                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1309                 lock->l_exp_refs_target = exp;
1310         }
1311         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1312                lock, exp, lock->l_exp_refs_nr);
1313         spin_unlock(&exp->exp_locks_list_guard);
1314 }
1315 EXPORT_SYMBOL(__class_export_add_lock_ref);
1316
1317 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1318 {
1319         spin_lock(&exp->exp_locks_list_guard);
1320         LASSERT(lock->l_exp_refs_nr > 0);
1321         if (lock->l_exp_refs_target != exp) {
1322                 LCONSOLE_WARN("lock %p, "
1323                               "mismatching export pointers: %p, %p\n",
1324                               lock, lock->l_exp_refs_target, exp);
1325         }
1326         if (-- lock->l_exp_refs_nr == 0) {
1327                 list_del_init(&lock->l_exp_refs_link);
1328                 lock->l_exp_refs_target = NULL;
1329         }
1330         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1331                lock, exp, lock->l_exp_refs_nr);
1332         spin_unlock(&exp->exp_locks_list_guard);
1333 }
1334 EXPORT_SYMBOL(__class_export_del_lock_ref);
1335 #endif
1336
1337 /* A connection defines an export context in which preallocation can
1338    be managed. This releases the export pointer reference, and returns
1339    the export handle, so the export refcount is 1 when this function
1340    returns. */
1341 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1342                   struct obd_uuid *cluuid)
1343 {
1344         struct obd_export *export;
1345         LASSERT(conn != NULL);
1346         LASSERT(obd != NULL);
1347         LASSERT(cluuid != NULL);
1348         ENTRY;
1349
1350         export = class_new_export(obd, cluuid);
1351         if (IS_ERR(export))
1352                 RETURN(PTR_ERR(export));
1353
1354         conn->cookie = export->exp_handle.h_cookie;
1355         class_export_put(export);
1356
1357         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1358                cluuid->uuid, conn->cookie);
1359         RETURN(0);
1360 }
1361 EXPORT_SYMBOL(class_connect);
1362
1363 /* if export is involved in recovery then clean up related things */
1364 static void class_export_recovery_cleanup(struct obd_export *exp)
1365 {
1366         struct obd_device *obd = exp->exp_obd;
1367
1368         spin_lock(&obd->obd_recovery_task_lock);
1369         if (obd->obd_recovering) {
1370                 if (exp->exp_in_recovery) {
1371                         spin_lock(&exp->exp_lock);
1372                         exp->exp_in_recovery = 0;
1373                         spin_unlock(&exp->exp_lock);
1374                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1375                         atomic_dec(&obd->obd_connected_clients);
1376                 }
1377
1378                 /* if called during recovery then should update
1379                  * obd_stale_clients counter,
1380                  * lightweight exports are not counted */
1381                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1382                         exp->exp_obd->obd_stale_clients++;
1383         }
1384         spin_unlock(&obd->obd_recovery_task_lock);
1385
1386         spin_lock(&exp->exp_lock);
1387         /** Cleanup req replay fields */
1388         if (exp->exp_req_replay_needed) {
1389                 exp->exp_req_replay_needed = 0;
1390
1391                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1392                 atomic_dec(&obd->obd_req_replay_clients);
1393         }
1394
1395         /** Cleanup lock replay data */
1396         if (exp->exp_lock_replay_needed) {
1397                 exp->exp_lock_replay_needed = 0;
1398
1399                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1400                 atomic_dec(&obd->obd_lock_replay_clients);
1401         }
1402         spin_unlock(&exp->exp_lock);
1403 }
1404
1405 /* This function removes 1-3 references from the export:
1406  * 1 - for export pointer passed
1407  * and if disconnect really need
1408  * 2 - removing from hash
1409  * 3 - in client_unlink_export
1410  * The export pointer passed to this function can destroyed */
1411 int class_disconnect(struct obd_export *export)
1412 {
1413         int already_disconnected;
1414         ENTRY;
1415
1416         if (export == NULL) {
1417                 CWARN("attempting to free NULL export %p\n", export);
1418                 RETURN(-EINVAL);
1419         }
1420
1421         spin_lock(&export->exp_lock);
1422         already_disconnected = export->exp_disconnected;
1423         export->exp_disconnected = 1;
1424 #ifdef HAVE_SERVER_SUPPORT
1425         /*  We hold references of export for uuid hash
1426          *  and nid_hash and export link at least. So
1427          *  it is safe to call rh*table_remove_fast in
1428          *  there.
1429          */
1430         obd_nid_del(export->exp_obd, export);
1431 #endif /* HAVE_SERVER_SUPPORT */
1432         spin_unlock(&export->exp_lock);
1433
1434         /* class_cleanup(), abort_recovery(), and class_fail_export()
1435          * all end up in here, and if any of them race we shouldn't
1436          * call extra class_export_puts(). */
1437         if (already_disconnected)
1438                 GOTO(no_disconn, already_disconnected);
1439
1440         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1441                export->exp_handle.h_cookie);
1442
1443         class_export_recovery_cleanup(export);
1444         class_unlink_export(export);
1445 no_disconn:
1446         class_export_put(export);
1447         RETURN(0);
1448 }
1449 EXPORT_SYMBOL(class_disconnect);
1450
1451 /* Return non-zero for a fully connected export */
1452 int class_connected_export(struct obd_export *exp)
1453 {
1454         int connected = 0;
1455
1456         if (exp) {
1457                 spin_lock(&exp->exp_lock);
1458                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1459                 spin_unlock(&exp->exp_lock);
1460         }
1461         return connected;
1462 }
1463 EXPORT_SYMBOL(class_connected_export);
1464
1465 static void class_disconnect_export_list(struct list_head *list,
1466                                          enum obd_option flags)
1467 {
1468         int rc;
1469         struct obd_export *exp;
1470         ENTRY;
1471
1472         /* It's possible that an export may disconnect itself, but
1473          * nothing else will be added to this list. */
1474         while (!list_empty(list)) {
1475                 exp = list_first_entry(list, struct obd_export,
1476                                        exp_obd_chain);
1477                 /* need for safe call CDEBUG after obd_disconnect */
1478                 class_export_get(exp);
1479
1480                 spin_lock(&exp->exp_lock);
1481                 exp->exp_flags = flags;
1482                 spin_unlock(&exp->exp_lock);
1483
1484                 if (obd_uuid_equals(&exp->exp_client_uuid,
1485                                     &exp->exp_obd->obd_uuid)) {
1486                         CDEBUG(D_HA,
1487                                "exp %p export uuid == obd uuid, don't discon\n",
1488                                exp);
1489                         /* Need to delete this now so we don't end up pointing
1490                          * to work_list later when this export is cleaned up. */
1491                         list_del_init(&exp->exp_obd_chain);
1492                         class_export_put(exp);
1493                         continue;
1494                 }
1495
1496                 class_export_get(exp);
1497                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1498                        "last request at %lld\n",
1499                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1500                        exp, exp->exp_last_request_time);
1501                 /* release one export reference anyway */
1502                 rc = obd_disconnect(exp);
1503
1504                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1505                        obd_export_nid2str(exp), exp, rc);
1506                 class_export_put(exp);
1507         }
1508         EXIT;
1509 }
1510
1511 void class_disconnect_exports(struct obd_device *obd)
1512 {
1513         LIST_HEAD(work_list);
1514         ENTRY;
1515
1516         /* Move all of the exports from obd_exports to a work list, en masse. */
1517         spin_lock(&obd->obd_dev_lock);
1518         list_splice_init(&obd->obd_exports, &work_list);
1519         list_splice_init(&obd->obd_delayed_exports, &work_list);
1520         spin_unlock(&obd->obd_dev_lock);
1521
1522         if (!list_empty(&work_list)) {
1523                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1524                        "disconnecting them\n", obd->obd_minor, obd);
1525                 class_disconnect_export_list(&work_list,
1526                                              exp_flags_from_obd(obd));
1527         } else
1528                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1529                        obd->obd_minor, obd);
1530         EXIT;
1531 }
1532 EXPORT_SYMBOL(class_disconnect_exports);
1533
1534 /* Remove exports that have not completed recovery.
1535  */
1536 void class_disconnect_stale_exports(struct obd_device *obd,
1537                                     int (*test_export)(struct obd_export *))
1538 {
1539         LIST_HEAD(work_list);
1540         struct obd_export *exp, *n;
1541         int evicted = 0;
1542         ENTRY;
1543
1544         spin_lock(&obd->obd_dev_lock);
1545         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1546                                  exp_obd_chain) {
1547                 /* don't count self-export as client */
1548                 if (obd_uuid_equals(&exp->exp_client_uuid,
1549                                     &exp->exp_obd->obd_uuid))
1550                         continue;
1551
1552                 /* don't evict clients which have no slot in last_rcvd
1553                  * (e.g. lightweight connection) */
1554                 if (exp->exp_target_data.ted_lr_idx == -1)
1555                         continue;
1556
1557                 spin_lock(&exp->exp_lock);
1558                 if (exp->exp_failed || test_export(exp)) {
1559                         spin_unlock(&exp->exp_lock);
1560                         continue;
1561                 }
1562                 exp->exp_failed = 1;
1563                 spin_unlock(&exp->exp_lock);
1564
1565                 list_move(&exp->exp_obd_chain, &work_list);
1566                 evicted++;
1567                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1568                        obd->obd_name, exp->exp_client_uuid.uuid,
1569                        obd_export_nid2str(exp));
1570                 print_export_data(exp, "EVICTING", 0, D_HA);
1571         }
1572         spin_unlock(&obd->obd_dev_lock);
1573
1574         if (evicted)
1575                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1576                               obd->obd_name, evicted);
1577
1578         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1579                                                  OBD_OPT_ABORT_RECOV);
1580         EXIT;
1581 }
1582 EXPORT_SYMBOL(class_disconnect_stale_exports);
1583
1584 void class_fail_export(struct obd_export *exp)
1585 {
1586         int rc, already_failed;
1587
1588         spin_lock(&exp->exp_lock);
1589         already_failed = exp->exp_failed;
1590         exp->exp_failed = 1;
1591         spin_unlock(&exp->exp_lock);
1592
1593         if (already_failed) {
1594                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1595                        exp, exp->exp_client_uuid.uuid);
1596                 return;
1597         }
1598
1599         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1600                exp, exp->exp_client_uuid.uuid);
1601
1602         if (obd_dump_on_timeout)
1603                 libcfs_debug_dumplog();
1604
1605         /* need for safe call CDEBUG after obd_disconnect */
1606         class_export_get(exp);
1607
1608         /* Most callers into obd_disconnect are removing their own reference
1609          * (request, for example) in addition to the one from the hash table.
1610          * We don't have such a reference here, so make one. */
1611         class_export_get(exp);
1612         rc = obd_disconnect(exp);
1613         if (rc)
1614                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1615         else
1616                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1617                        exp, exp->exp_client_uuid.uuid);
1618         class_export_put(exp);
1619 }
1620 EXPORT_SYMBOL(class_fail_export);
1621
1622 #ifdef HAVE_SERVER_SUPPORT
1623
1624 static int take_first(struct obd_export *exp, void *data)
1625 {
1626         struct obd_export **expp = data;
1627
1628         if (*expp)
1629                 /* already have one */
1630                 return 0;
1631         if (exp->exp_failed)
1632                 /* Don't want this one */
1633                 return 0;
1634         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1635                 /* Cannot get a ref on this one */
1636                 return 0;
1637         *expp = exp;
1638         return 1;
1639 }
1640
1641 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1642 {
1643         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1644         struct obd_export *doomed_exp;
1645         int exports_evicted = 0;
1646
1647         spin_lock(&obd->obd_dev_lock);
1648         /* umount has run already, so evict thread should leave
1649          * its task to umount thread now */
1650         if (obd->obd_stopping) {
1651                 spin_unlock(&obd->obd_dev_lock);
1652                 return exports_evicted;
1653         }
1654         spin_unlock(&obd->obd_dev_lock);
1655
1656         doomed_exp = NULL;
1657         while (obd_nid_export_for_each(obd, nid_key,
1658                                        take_first, &doomed_exp) > 0) {
1659
1660                 LASSERTF(doomed_exp != obd->obd_self_export,
1661                          "self-export is hashed by NID?\n");
1662
1663                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1664                               obd->obd_name,
1665                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1666                               obd_export_nid2str(doomed_exp));
1667
1668                 class_fail_export(doomed_exp);
1669                 class_export_put(doomed_exp);
1670                 exports_evicted++;
1671                 doomed_exp = NULL;
1672         }
1673
1674         if (!exports_evicted)
1675                 CDEBUG(D_HA,
1676                        "%s: can't disconnect NID '%s': no exports found\n",
1677                        obd->obd_name, nid);
1678         return exports_evicted;
1679 }
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1681
1682 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1683 {
1684         struct obd_export *doomed_exp = NULL;
1685         struct obd_uuid doomed_uuid;
1686         int exports_evicted = 0;
1687
1688         spin_lock(&obd->obd_dev_lock);
1689         if (obd->obd_stopping) {
1690                 spin_unlock(&obd->obd_dev_lock);
1691                 return exports_evicted;
1692         }
1693         spin_unlock(&obd->obd_dev_lock);
1694
1695         obd_str2uuid(&doomed_uuid, uuid);
1696         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1697                 CERROR("%s: can't evict myself\n", obd->obd_name);
1698                 return exports_evicted;
1699         }
1700
1701         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1702         if (doomed_exp == NULL) {
1703                 CERROR("%s: can't disconnect %s: no exports found\n",
1704                        obd->obd_name, uuid);
1705         } else {
1706                 CWARN("%s: evicting %s at adminstrative request\n",
1707                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1708                 class_fail_export(doomed_exp);
1709                 class_export_put(doomed_exp);
1710                 obd_uuid_del(obd, doomed_exp);
1711                 exports_evicted++;
1712         }
1713
1714         return exports_evicted;
1715 }
1716 #endif /* HAVE_SERVER_SUPPORT */
1717
1718 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1719 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1720 EXPORT_SYMBOL(class_export_dump_hook);
1721 #endif
1722
1723 static void print_export_data(struct obd_export *exp, const char *status,
1724                               int locks, int debug_level)
1725 {
1726         struct ptlrpc_reply_state *rs;
1727         struct ptlrpc_reply_state *first_reply = NULL;
1728         int nreplies = 0;
1729
1730         spin_lock(&exp->exp_lock);
1731         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1732                             rs_exp_list) {
1733                 if (nreplies == 0)
1734                         first_reply = rs;
1735                 nreplies++;
1736         }
1737         spin_unlock(&exp->exp_lock);
1738
1739         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1740                "%p %s %llu stale:%d\n",
1741                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1742                obd_export_nid2str(exp),
1743                refcount_read(&exp->exp_handle.h_ref),
1744                atomic_read(&exp->exp_rpc_count),
1745                atomic_read(&exp->exp_cb_count),
1746                atomic_read(&exp->exp_locks_count),
1747                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1748                nreplies, first_reply, nreplies > 3 ? "..." : "",
1749                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1750 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1751         if (locks && class_export_dump_hook != NULL)
1752                 class_export_dump_hook(exp);
1753 #endif
1754 }
1755
1756 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1757 {
1758         struct obd_export *exp;
1759
1760         spin_lock(&obd->obd_dev_lock);
1761         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1762                 print_export_data(exp, "ACTIVE", locks, debug_level);
1763         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1764                 print_export_data(exp, "UNLINKED", locks, debug_level);
1765         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1766                 print_export_data(exp, "DELAYED", locks, debug_level);
1767         spin_unlock(&obd->obd_dev_lock);
1768 }
1769
1770 void obd_exports_barrier(struct obd_device *obd)
1771 {
1772         int waited = 2;
1773         LASSERT(list_empty(&obd->obd_exports));
1774         spin_lock(&obd->obd_dev_lock);
1775         while (!list_empty(&obd->obd_unlinked_exports)) {
1776                 spin_unlock(&obd->obd_dev_lock);
1777                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1778                 if (waited > 5 && is_power_of_2(waited)) {
1779                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1780                                       "more than %d seconds. "
1781                                       "The obd refcount = %d. Is it stuck?\n",
1782                                       obd->obd_name, waited,
1783                                       atomic_read(&obd->obd_refcount));
1784                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1785                 }
1786                 waited *= 2;
1787                 spin_lock(&obd->obd_dev_lock);
1788         }
1789         spin_unlock(&obd->obd_dev_lock);
1790 }
1791 EXPORT_SYMBOL(obd_exports_barrier);
1792
1793 /**
1794  * Add export to the obd_zombe thread and notify it.
1795  */
1796 static void obd_zombie_export_add(struct obd_export *exp) {
1797         atomic_dec(&obd_stale_export_num);
1798         spin_lock(&exp->exp_obd->obd_dev_lock);
1799         LASSERT(!list_empty(&exp->exp_obd_chain));
1800         list_del_init(&exp->exp_obd_chain);
1801         spin_unlock(&exp->exp_obd->obd_dev_lock);
1802
1803         queue_work(zombie_wq, &exp->exp_zombie_work);
1804 }
1805
1806 /**
1807  * Add import to the obd_zombe thread and notify it.
1808  */
1809 static void obd_zombie_import_add(struct obd_import *imp) {
1810         LASSERT(imp->imp_sec == NULL);
1811
1812         queue_work(zombie_wq, &imp->imp_zombie_work);
1813 }
1814
1815 /**
1816  * wait when obd_zombie import/export queues become empty
1817  */
1818 void obd_zombie_barrier(void)
1819 {
1820         flush_workqueue(zombie_wq);
1821 }
1822 EXPORT_SYMBOL(obd_zombie_barrier);
1823
1824
1825 struct obd_export *obd_stale_export_get(void)
1826 {
1827         struct obd_export *exp = NULL;
1828         ENTRY;
1829
1830         spin_lock(&obd_stale_export_lock);
1831         if (!list_empty(&obd_stale_exports)) {
1832                 exp = list_first_entry(&obd_stale_exports,
1833                                        struct obd_export, exp_stale_list);
1834                 list_del_init(&exp->exp_stale_list);
1835         }
1836         spin_unlock(&obd_stale_export_lock);
1837
1838         if (exp) {
1839                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1840                        atomic_read(&obd_stale_export_num));
1841         }
1842         RETURN(exp);
1843 }
1844 EXPORT_SYMBOL(obd_stale_export_get);
1845
1846 void obd_stale_export_put(struct obd_export *exp)
1847 {
1848         ENTRY;
1849
1850         LASSERT(list_empty(&exp->exp_stale_list));
1851         if (exp->exp_lock_hash &&
1852             atomic_read(&exp->exp_lock_hash->hs_count)) {
1853                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1854                        atomic_read(&obd_stale_export_num));
1855
1856                 spin_lock_bh(&exp->exp_bl_list_lock);
1857                 spin_lock(&obd_stale_export_lock);
1858                 /* Add to the tail if there is no blocked locks,
1859                  * to the head otherwise. */
1860                 if (list_empty(&exp->exp_bl_list))
1861                         list_add_tail(&exp->exp_stale_list,
1862                                       &obd_stale_exports);
1863                 else
1864                         list_add(&exp->exp_stale_list,
1865                                  &obd_stale_exports);
1866
1867                 spin_unlock(&obd_stale_export_lock);
1868                 spin_unlock_bh(&exp->exp_bl_list_lock);
1869         } else {
1870                 class_export_put(exp);
1871         }
1872         EXIT;
1873 }
1874 EXPORT_SYMBOL(obd_stale_export_put);
1875
1876 /**
1877  * Adjust the position of the export in the stale list,
1878  * i.e. move to the head of the list if is needed.
1879  **/
1880 void obd_stale_export_adjust(struct obd_export *exp)
1881 {
1882         LASSERT(exp != NULL);
1883         spin_lock_bh(&exp->exp_bl_list_lock);
1884         spin_lock(&obd_stale_export_lock);
1885
1886         if (!list_empty(&exp->exp_stale_list) &&
1887             !list_empty(&exp->exp_bl_list))
1888                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1889
1890         spin_unlock(&obd_stale_export_lock);
1891         spin_unlock_bh(&exp->exp_bl_list_lock);
1892 }
1893 EXPORT_SYMBOL(obd_stale_export_adjust);
1894
1895 /**
1896  * start destroy zombie import/export thread
1897  */
1898 int obd_zombie_impexp_init(void)
1899 {
1900         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1901                                            0, CFS_CPT_ANY,
1902                                            cfs_cpt_number(cfs_cpt_tab));
1903
1904         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1905 }
1906
1907 /**
1908  * stop destroy zombie import/export thread
1909  */
1910 void obd_zombie_impexp_stop(void)
1911 {
1912         destroy_workqueue(zombie_wq);
1913         LASSERT(list_empty(&obd_stale_exports));
1914 }
1915
1916 /***** Kernel-userspace comm helpers *******/
1917
1918 /* Get length of entire message, including header */
1919 int kuc_len(int payload_len)
1920 {
1921         return sizeof(struct kuc_hdr) + payload_len;
1922 }
1923 EXPORT_SYMBOL(kuc_len);
1924
1925 /* Get a pointer to kuc header, given a ptr to the payload
1926  * @param p Pointer to payload area
1927  * @returns Pointer to kuc header
1928  */
1929 struct kuc_hdr * kuc_ptr(void *p)
1930 {
1931         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1932         LASSERT(lh->kuc_magic == KUC_MAGIC);
1933         return lh;
1934 }
1935 EXPORT_SYMBOL(kuc_ptr);
1936
1937 /* Alloc space for a message, and fill in header
1938  * @return Pointer to payload area
1939  */
1940 void *kuc_alloc(int payload_len, int transport, int type)
1941 {
1942         struct kuc_hdr *lh;
1943         int len = kuc_len(payload_len);
1944
1945         OBD_ALLOC(lh, len);
1946         if (lh == NULL)
1947                 return ERR_PTR(-ENOMEM);
1948
1949         lh->kuc_magic = KUC_MAGIC;
1950         lh->kuc_transport = transport;
1951         lh->kuc_msgtype = type;
1952         lh->kuc_msglen = len;
1953
1954         return (void *)(lh + 1);
1955 }
1956 EXPORT_SYMBOL(kuc_alloc);
1957
1958 /* Takes pointer to payload area */
1959 void kuc_free(void *p, int payload_len)
1960 {
1961         struct kuc_hdr *lh = kuc_ptr(p);
1962         OBD_FREE(lh, kuc_len(payload_len));
1963 }
1964 EXPORT_SYMBOL(kuc_free);
1965
1966 struct obd_request_slot_waiter {
1967         struct list_head        orsw_entry;
1968         wait_queue_head_t       orsw_waitq;
1969         bool                    orsw_signaled;
1970 };
1971
1972 static bool obd_request_slot_avail(struct client_obd *cli,
1973                                    struct obd_request_slot_waiter *orsw)
1974 {
1975         bool avail;
1976
1977         spin_lock(&cli->cl_loi_list_lock);
1978         avail = !!list_empty(&orsw->orsw_entry);
1979         spin_unlock(&cli->cl_loi_list_lock);
1980
1981         return avail;
1982 };
1983
1984 /*
1985  * For network flow control, the RPC sponsor needs to acquire a credit
1986  * before sending the RPC. The credits count for a connection is defined
1987  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1988  * the subsequent RPC sponsors need to wait until others released their
1989  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1990  */
1991 int obd_get_request_slot(struct client_obd *cli)
1992 {
1993         struct obd_request_slot_waiter   orsw;
1994         int                              rc;
1995
1996         spin_lock(&cli->cl_loi_list_lock);
1997         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1998                 cli->cl_rpcs_in_flight++;
1999                 spin_unlock(&cli->cl_loi_list_lock);
2000                 return 0;
2001         }
2002
2003         init_waitqueue_head(&orsw.orsw_waitq);
2004         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2005         orsw.orsw_signaled = false;
2006         spin_unlock(&cli->cl_loi_list_lock);
2007
2008         rc = l_wait_event_abortable(orsw.orsw_waitq,
2009                                     obd_request_slot_avail(cli, &orsw) ||
2010                                     orsw.orsw_signaled);
2011
2012         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2013          * freed but other (such as obd_put_request_slot) is using it. */
2014         spin_lock(&cli->cl_loi_list_lock);
2015         if (rc != 0) {
2016                 if (!orsw.orsw_signaled) {
2017                         if (list_empty(&orsw.orsw_entry))
2018                                 cli->cl_rpcs_in_flight--;
2019                         else
2020                                 list_del(&orsw.orsw_entry);
2021                 }
2022                 rc = -EINTR;
2023         }
2024
2025         if (orsw.orsw_signaled) {
2026                 LASSERT(list_empty(&orsw.orsw_entry));
2027
2028                 rc = -EINTR;
2029         }
2030         spin_unlock(&cli->cl_loi_list_lock);
2031
2032         return rc;
2033 }
2034 EXPORT_SYMBOL(obd_get_request_slot);
2035
2036 void obd_put_request_slot(struct client_obd *cli)
2037 {
2038         struct obd_request_slot_waiter *orsw;
2039
2040         spin_lock(&cli->cl_loi_list_lock);
2041         cli->cl_rpcs_in_flight--;
2042
2043         /* If there is free slot, wakeup the first waiter. */
2044         if (!list_empty(&cli->cl_flight_waiters) &&
2045             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2046                 orsw = list_first_entry(&cli->cl_flight_waiters,
2047                                         struct obd_request_slot_waiter,
2048                                         orsw_entry);
2049                 list_del_init(&orsw->orsw_entry);
2050                 cli->cl_rpcs_in_flight++;
2051                 wake_up(&orsw->orsw_waitq);
2052         }
2053         spin_unlock(&cli->cl_loi_list_lock);
2054 }
2055 EXPORT_SYMBOL(obd_put_request_slot);
2056
2057 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2058 {
2059         return cli->cl_max_rpcs_in_flight;
2060 }
2061 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2062
2063 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2064 {
2065         struct obd_request_slot_waiter *orsw;
2066         __u32                           old;
2067         int                             diff;
2068         int                             i;
2069         int                             rc;
2070
2071         if (max > OBD_MAX_RIF_MAX || max < 1)
2072                 return -ERANGE;
2073
2074         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2075                cli->cl_import->imp_obd->obd_name, max,
2076                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2077
2078         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2079                    LUSTRE_MDC_NAME) == 0) {
2080                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2081                  * strictly lower that max_rpcs_in_flight */
2082                 if (max < 2) {
2083                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2084                                cli->cl_import->imp_obd->obd_name);
2085                         return -ERANGE;
2086                 }
2087                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2088                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2089                         if (rc != 0)
2090                                 return rc;
2091                 }
2092         }
2093
2094         spin_lock(&cli->cl_loi_list_lock);
2095         old = cli->cl_max_rpcs_in_flight;
2096         cli->cl_max_rpcs_in_flight = max;
2097         client_adjust_max_dirty(cli);
2098
2099         diff = max - old;
2100
2101         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2102         for (i = 0; i < diff; i++) {
2103                 if (list_empty(&cli->cl_flight_waiters))
2104                         break;
2105
2106                 orsw = list_first_entry(&cli->cl_flight_waiters,
2107                                         struct obd_request_slot_waiter,
2108                                         orsw_entry);
2109                 list_del_init(&orsw->orsw_entry);
2110                 cli->cl_rpcs_in_flight++;
2111                 wake_up(&orsw->orsw_waitq);
2112         }
2113         spin_unlock(&cli->cl_loi_list_lock);
2114
2115         return 0;
2116 }
2117 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2118
2119 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2120 {
2121         return cli->cl_max_mod_rpcs_in_flight;
2122 }
2123 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2124
2125 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2126 {
2127         struct obd_connect_data *ocd;
2128         __u16 maxmodrpcs;
2129         __u16 prev;
2130
2131         if (max > OBD_MAX_RIF_MAX || max < 1)
2132                 return -ERANGE;
2133
2134         ocd = &cli->cl_import->imp_connect_data;
2135         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2136                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2137                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2138
2139         if (max == OBD_MAX_RIF_MAX)
2140                 max = OBD_MAX_RIF_MAX - 1;
2141
2142         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2143          * increase this value, also bump up max_rpcs_in_flight to match.
2144          */
2145         if (max >= cli->cl_max_rpcs_in_flight) {
2146                 CDEBUG(D_INFO,
2147                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2148                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2149                 obd_set_max_rpcs_in_flight(cli, max + 1);
2150         }
2151
2152         /* cannot exceed max modify RPCs in flight supported by the server,
2153          * but verify ocd_connect_flags is at least initialized first.  If
2154          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2155          */
2156         if (!ocd->ocd_connect_flags) {
2157                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2158         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2159                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2160                 if (maxmodrpcs == 0) { /* connection not finished yet */
2161                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2162                         CDEBUG(D_INFO,
2163                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2164                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2165                 }
2166         } else {
2167                 maxmodrpcs = 1;
2168         }
2169         if (max > maxmodrpcs) {
2170                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2171                        cli->cl_import->imp_obd->obd_name,
2172                        max, maxmodrpcs);
2173                 return -ERANGE;
2174         }
2175
2176         spin_lock(&cli->cl_mod_rpcs_lock);
2177
2178         prev = cli->cl_max_mod_rpcs_in_flight;
2179         cli->cl_max_mod_rpcs_in_flight = max;
2180
2181         /* wakeup waiters if limit has been increased */
2182         if (cli->cl_max_mod_rpcs_in_flight > prev)
2183                 wake_up(&cli->cl_mod_rpcs_waitq);
2184
2185         spin_unlock(&cli->cl_mod_rpcs_lock);
2186
2187         return 0;
2188 }
2189 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2190
2191 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2192                                struct seq_file *seq)
2193 {
2194         unsigned long mod_tot = 0, mod_cum;
2195         struct timespec64 now;
2196         int i;
2197
2198         ktime_get_real_ts64(&now);
2199
2200         spin_lock(&cli->cl_mod_rpcs_lock);
2201
2202         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2203                    (s64)now.tv_sec, now.tv_nsec);
2204         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2205                    cli->cl_mod_rpcs_in_flight);
2206
2207         seq_printf(seq, "\n\t\t\tmodify\n");
2208         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2209
2210         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2211
2212         mod_cum = 0;
2213         for (i = 0; i < OBD_HIST_MAX; i++) {
2214                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2215                 mod_cum += mod;
2216                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2217                            i, mod, pct(mod, mod_tot),
2218                            pct(mod_cum, mod_tot));
2219                 if (mod_cum == mod_tot)
2220                         break;
2221         }
2222
2223         spin_unlock(&cli->cl_mod_rpcs_lock);
2224
2225         return 0;
2226 }
2227 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2228
2229 /* The number of modify RPCs sent in parallel is limited
2230  * because the server has a finite number of slots per client to
2231  * store request result and ensure reply reconstruction when needed.
2232  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2233  * that takes into account server limit and cl_max_rpcs_in_flight
2234  * value.
2235  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2236  * one close request is allowed above the maximum.
2237  */
2238 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2239                                                  bool close_req)
2240 {
2241         bool avail;
2242
2243         /* A slot is available if
2244          * - number of modify RPCs in flight is less than the max
2245          * - it's a close RPC and no other close request is in flight
2246          */
2247         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2248                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2249
2250         return avail;
2251 }
2252
2253 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2254                                          bool close_req)
2255 {
2256         bool avail;
2257
2258         spin_lock(&cli->cl_mod_rpcs_lock);
2259         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2260         spin_unlock(&cli->cl_mod_rpcs_lock);
2261         return avail;
2262 }
2263
2264
2265 /* Get a modify RPC slot from the obd client @cli according
2266  * to the kind of operation @opc that is going to be sent
2267  * and the intent @it of the operation if it applies.
2268  * If the maximum number of modify RPCs in flight is reached
2269  * the thread is put to sleep.
2270  * Returns the tag to be set in the request message. Tag 0
2271  * is reserved for non-modifying requests.
2272  */
2273 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2274 {
2275         bool                    close_req = false;
2276         __u16                   i, max;
2277
2278         if (opc == MDS_CLOSE)
2279                 close_req = true;
2280
2281         do {
2282                 spin_lock(&cli->cl_mod_rpcs_lock);
2283                 max = cli->cl_max_mod_rpcs_in_flight;
2284                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2285                         /* there is a slot available */
2286                         cli->cl_mod_rpcs_in_flight++;
2287                         if (close_req)
2288                                 cli->cl_close_rpcs_in_flight++;
2289                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2290                                          cli->cl_mod_rpcs_in_flight);
2291                         /* find a free tag */
2292                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2293                                                 max + 1);
2294                         LASSERT(i < OBD_MAX_RIF_MAX);
2295                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2296                         spin_unlock(&cli->cl_mod_rpcs_lock);
2297                         /* tag 0 is reserved for non-modify RPCs */
2298
2299                         CDEBUG(D_RPCTRACE,
2300                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2301                                cli->cl_import->imp_obd->obd_name,
2302                                i + 1, opc, max);
2303
2304                         return i + 1;
2305                 }
2306                 spin_unlock(&cli->cl_mod_rpcs_lock);
2307
2308                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2309                        "opc %u, max %hu\n",
2310                        cli->cl_import->imp_obd->obd_name, opc, max);
2311
2312                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2313                                           obd_mod_rpc_slot_avail(cli,
2314                                                                  close_req));
2315         } while (true);
2316 }
2317 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2318
2319 /* Put a modify RPC slot from the obd client @cli according
2320  * to the kind of operation @opc that has been sent.
2321  */
2322 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2323 {
2324         bool                    close_req = false;
2325
2326         if (tag == 0)
2327                 return;
2328
2329         if (opc == MDS_CLOSE)
2330                 close_req = true;
2331
2332         spin_lock(&cli->cl_mod_rpcs_lock);
2333         cli->cl_mod_rpcs_in_flight--;
2334         if (close_req)
2335                 cli->cl_close_rpcs_in_flight--;
2336         /* release the tag in the bitmap */
2337         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2338         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2339         spin_unlock(&cli->cl_mod_rpcs_lock);
2340         wake_up(&cli->cl_mod_rpcs_waitq);
2341 }
2342 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2343