Whamcloud - gitweb
LU-3319 procfs: move ost proc handling over to seq_file
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OST
43
44 #include <linux/module.h>
45 #include <obd_ost.h>
46 #include <lustre_dlm.h>
47 #include <lprocfs_status.h>
48 #include "ost_internal.h"
49
50 static int oss_num_threads;
51 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
52                 "number of OSS service threads to start");
53
54 static int ost_num_threads;
55 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
56                 "number of OST service threads to start (deprecated)");
57
58 static int oss_num_create_threads;
59 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
60                 "number of OSS create threads to start");
61
62 static char *oss_cpts;
63 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
64                 "CPU partitions OSS threads should run on");
65
66 static char *oss_io_cpts;
67 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
68                 "CPU partitions OSS IO threads should run on");
69
70 /**
71  * Validate oa from client.
72  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
73  * req are valid.
74  *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
75  *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
76  *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
77  *       whether this is oi_fid or real ostid. So it will check
78  *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
79  *    c. Old FID-disable osc will send IDIF.
80  *    d. new FID-enable osc/osp will send normal FID.
81  *
82  * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
83  * be used for LAST_ID file, and only being accessed inside OST now.
84  */
85 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
86                              struct obd_ioobj *ioobj)
87 {
88         int rc = 0;
89
90         if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
91                      fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
92                 /* Sigh 2.[123] client still sends echo req with oi_id = 0
93                  * during create, and we will reset this to 1, since this
94                  * oi_id is basically useless in the following create process,
95                  * but oi_id == 0 will make it difficult to tell whether it is
96                  * real FID or ost_id. */
97                 oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
98                 oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
99                 oa->o_oi.oi_fid.f_ver = 0;
100         } else {
101                 if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
102                         GOTO(out, rc = -EPROTO);
103
104                 /* Note: this check might be forced in 2.5 or 2.6, i.e.
105                  * all of the requests are required to setup FLGROUP */
106                 if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
107                         ostid_set_seq_mdt0(&oa->o_oi);
108                         if (ioobj)
109                                 ostid_set_seq_mdt0(&ioobj->ioo_oid);
110                         oa->o_valid |= OBD_MD_FLGROUP;
111                 }
112
113                 if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
114                                fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
115                                fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
116                                fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
117                         GOTO(out, rc = -EPROTO);
118         }
119
120         if (ioobj != NULL) {
121                 unsigned max_brw = ioobj_max_brw_get(ioobj);
122
123                 if (unlikely((max_brw & (max_brw - 1)) != 0)) {
124                         CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
125                                ": rc = -EPROTO\n", exp->exp_obd->obd_name,
126                                obd_export_nid2str(exp), max_brw,
127                                POSTID(&oa->o_oi));
128                         GOTO(out, rc = -EPROTO);
129                 }
130                 ioobj->ioo_oid = oa->o_oi;
131         }
132
133 out:
134         if (rc != 0)
135                 CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
136                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
137                        oa ? ostid_seq(&oa->o_oi) : -1,
138                        oa ? ostid_id(&oa->o_oi) : -1, rc);
139         return rc;
140 }
141
142 struct ost_prolong_data {
143         struct ptlrpc_request *opd_req;
144         struct obd_export     *opd_exp;
145         struct obdo           *opd_oa;
146         struct ldlm_res_id     opd_resid;
147         struct ldlm_extent     opd_extent;
148         ldlm_mode_t            opd_mode;
149         unsigned int           opd_locks;
150         int                    opd_timeout;
151 };
152
153 /* prolong locks for the current service time of the corresponding
154  * portal (= OST_IO_PORTAL)
155  */
156 static inline int prolong_timeout(struct ptlrpc_request *req)
157 {
158         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
159
160         if (AT_OFF)
161                 return obd_timeout / 2;
162
163         return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
164                    ldlm_timeout);
165 }
166
167 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
168                                  struct ldlm_lock *lock)
169 {
170         LASSERT(lock->l_export == opd->opd_exp);
171
172         if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
173                 return;
174
175         /* XXX: never try to grab resource lock here because we're inside
176          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
177          * res lock and then exp_bl_list_lock. */
178
179         if (!(lock->l_flags & LDLM_FL_AST_SENT))
180                 /* ignore locks not being cancelled */
181                 return;
182
183         LDLM_DEBUG(lock,
184                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
185                    opd->opd_req->rq_xid, opd->opd_extent.start,
186                    opd->opd_extent.end, opd->opd_timeout);
187
188         /* OK. this is a possible lock the user holds doing I/O
189          * let's refresh eviction timer for it */
190         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
191         ++opd->opd_locks;
192 }
193
194 static void ost_prolong_locks(struct ost_prolong_data *data)
195 {
196         struct obd_export *exp = data->opd_exp;
197         struct obdo       *oa  = data->opd_oa;
198         struct ldlm_lock  *lock;
199         ENTRY;
200
201         if (oa->o_valid & OBD_MD_FLHANDLE) {
202                 /* mostly a request should be covered by only one lock, try
203                  * fast path. */
204                 lock = ldlm_handle2lock(&oa->o_handle);
205                 if (lock != NULL) {
206                         /* Fast path to check if the lock covers the whole IO
207                          * region exclusively. */
208                         if (lock->l_granted_mode == LCK_PW &&
209                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
210                                                 &data->opd_extent)) {
211                                 /* bingo */
212                                 ost_prolong_lock_one(data, lock);
213                                 LDLM_LOCK_PUT(lock);
214                                 RETURN_EXIT;
215                         }
216                         LDLM_LOCK_PUT(lock);
217                 }
218         }
219
220
221         spin_lock_bh(&exp->exp_bl_list_lock);
222         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
223                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
224                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
225
226                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
227                         continue;
228
229                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
230                                          &data->opd_extent))
231                         continue;
232
233                 ost_prolong_lock_one(data, lock);
234         }
235         spin_unlock_bh(&exp->exp_bl_list_lock);
236
237         EXIT;
238 }
239
240 /**
241  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
242  * not.
243  */
244 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
245                                    struct ldlm_lock *lock)
246 {
247         struct niobuf_remote *nb;
248         struct obd_ioobj *ioo;
249         int mode, opc;
250         struct ldlm_extent ext;
251         ENTRY;
252
253         opc = lustre_msg_get_opc(req->rq_reqmsg);
254         LASSERT(opc == OST_READ || opc == OST_WRITE);
255
256         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
257         LASSERT(ioo != NULL);
258
259         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
260         LASSERT(nb != NULL);
261
262         ext.start = nb->offset;
263         nb += ioo->ioo_bufcnt - 1;
264         ext.end = nb->offset + nb->len - 1;
265
266         LASSERT(lock->l_resource != NULL);
267         if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
268                 RETURN(0);
269
270         mode = LCK_PW;
271         if (opc == OST_READ)
272                 mode |= LCK_PR;
273         if (!(lock->l_granted_mode & mode))
274                 RETURN(0);
275
276         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
277 }
278
279 /**
280  * High-priority queue request check for whether the given PTLRPC request (\a
281  * req) is blocking an LDLM lock cancel.
282  *
283  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
284  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
285  *
286  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
287  * function looks only at OST_READs and OST_WRITEs.
288  */
289 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
290 {
291         struct obd_device *obd = req->rq_export->exp_obd;
292         struct ost_body *body;
293         struct obd_ioobj *ioo;
294         struct niobuf_remote *nb;
295         struct ost_prolong_data opd = { 0 };
296         int mode, opc;
297         ENTRY;
298
299         /*
300          * Use LASSERT to do sanity check because malformed RPCs should have
301          * been filtered out in ost_hpreq_handler().
302          */
303         opc = lustre_msg_get_opc(req->rq_reqmsg);
304         LASSERT(opc == OST_READ || opc == OST_WRITE);
305
306         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
307         LASSERT(body != NULL);
308
309         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
310         LASSERT(ioo != NULL);
311
312         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
313         LASSERT(nb != NULL);
314         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
315
316         ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
317
318         opd.opd_req = req;
319         mode = LCK_PW;
320         if (opc == OST_READ)
321                 mode |= LCK_PR;
322         opd.opd_mode = mode;
323         opd.opd_exp = req->rq_export;
324         opd.opd_oa  = &body->oa;
325         opd.opd_extent.start = nb->offset;
326         nb += ioo->ioo_bufcnt - 1;
327         opd.opd_extent.end = nb->offset + nb->len - 1;
328         opd.opd_timeout = prolong_timeout(req);
329
330         DEBUG_REQ(D_RPCTRACE, req,
331                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
332                obd->obd_name, current->comm,
333                opd.opd_resid.name[0], opd.opd_resid.name[1],
334                opd.opd_extent.start, opd.opd_extent.end);
335
336         ost_prolong_locks(&opd);
337
338         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
339                obd->obd_name, opd.opd_locks, req);
340
341         RETURN(opd.opd_locks > 0);
342 }
343
344 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
345 {
346         (void)ost_rw_hpreq_check(req);
347 }
348
349 /**
350  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
351  */
352 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
353                                       struct ldlm_lock *lock)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359         LASSERT(body != NULL);
360
361         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
362             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
363                 RETURN(1);
364
365         RETURN(0);
366 }
367
368 /**
369  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
370  */
371 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
372 {
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct ost_body *body;
375         struct obdo *oa;
376         struct ost_prolong_data opd = { 0 };
377         __u64 start, end;
378         ENTRY;
379
380         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
381         LASSERT(body != NULL);
382
383         oa = &body->oa;
384         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
385                 !(oa->o_flags & OBD_FL_SRVLOCK));
386
387         start = oa->o_size;
388         end = start + oa->o_blocks;
389
390         opd.opd_req = req;
391         opd.opd_mode = LCK_PW;
392         opd.opd_exp = req->rq_export;
393         opd.opd_oa  = oa;
394         opd.opd_extent.start = start;
395         opd.opd_extent.end   = end;
396         if (oa->o_blocks == OBD_OBJECT_EOF)
397                 opd.opd_extent.end = OBD_OBJECT_EOF;
398         opd.opd_timeout = prolong_timeout(req);
399
400         ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
401
402         CDEBUG(D_DLMTRACE,
403                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
404                obd->obd_name,
405                opd.opd_resid.name[0], opd.opd_resid.name[1],
406                opd.opd_extent.start, opd.opd_extent.end);
407
408         ost_prolong_locks(&opd);
409
410         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
411                obd->obd_name, opd.opd_locks, req);
412
413         RETURN(opd.opd_locks > 0);
414 }
415
416 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
417 {
418         (void)ost_punch_hpreq_check(req);
419 }
420
421 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
422         .hpreq_lock_match = ost_rw_hpreq_lock_match,
423         .hpreq_check      = ost_rw_hpreq_check,
424         .hpreq_fini       = ost_rw_hpreq_fini
425 };
426
427 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
428         .hpreq_lock_match = ost_punch_hpreq_lock_match,
429         .hpreq_check      = ost_punch_hpreq_check,
430         .hpreq_fini       = ost_punch_hpreq_fini
431 };
432
433 /** Assign high priority operations to the request if needed. */
434 static int ost_io_hpreq_handler(struct ptlrpc_request *req)
435 {
436         ENTRY;
437         if (req->rq_export) {
438                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
439                 struct ost_body *body;
440
441                 if (opc == OST_READ || opc == OST_WRITE) {
442                         struct niobuf_remote *nb;
443                         struct obd_ioobj *ioo;
444                         int objcount, niocount;
445                         int rc;
446                         int i;
447
448                         /* RPCs on the H-P queue can be inspected before
449                          * ost_handler() initializes their pills, so we
450                          * initialize that here.  Capsule initialization is
451                          * idempotent, as is setting the pill's format (provided
452                          * it doesn't change).
453                          */
454                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
455                         if (opc == OST_READ)
456                                 req_capsule_set(&req->rq_pill,
457                                                 &RQF_OST_BRW_READ);
458                         else
459                                 req_capsule_set(&req->rq_pill,
460                                                 &RQF_OST_BRW_WRITE);
461
462                         body = req_capsule_client_get(&req->rq_pill,
463                                                       &RMF_OST_BODY);
464                         if (body == NULL) {
465                                 CERROR("Missing/short ost_body\n");
466                                 RETURN(-EFAULT);
467                         }
468
469                         objcount = req_capsule_get_size(&req->rq_pill,
470                                                         &RMF_OBD_IOOBJ,
471                                                         RCL_CLIENT) /
472                                                         sizeof(*ioo);
473                         if (objcount == 0) {
474                                 CERROR("Missing/short ioobj\n");
475                                 RETURN(-EFAULT);
476                         }
477                         if (objcount > 1) {
478                                 CERROR("too many ioobjs (%d)\n", objcount);
479                                 RETURN(-EFAULT);
480                         }
481
482                         ioo = req_capsule_client_get(&req->rq_pill,
483                                                      &RMF_OBD_IOOBJ);
484                         if (ioo == NULL) {
485                                 CERROR("Missing/short ioobj\n");
486                                 RETURN(-EFAULT);
487                         }
488
489                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
490                         if (rc) {
491                                 CERROR("invalid object ids\n");
492                                 RETURN(rc);
493                         }
494
495                         for (niocount = i = 0; i < objcount; i++) {
496                                 if (ioo[i].ioo_bufcnt == 0) {
497                                         CERROR("ioo[%d] has zero bufcnt\n", i);
498                                         RETURN(-EFAULT);
499                                 }
500                                 niocount += ioo[i].ioo_bufcnt;
501                         }
502                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
503                                 DEBUG_REQ(D_RPCTRACE, req,
504                                           "bulk has too many pages (%d)",
505                                           niocount);
506                                 RETURN(-EFAULT);
507                         }
508
509                         nb = req_capsule_client_get(&req->rq_pill,
510                                                     &RMF_NIOBUF_REMOTE);
511                         if (nb == NULL) {
512                                 CERROR("Missing/short niobuf\n");
513                                 RETURN(-EFAULT);
514                         }
515
516                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
517                                 req->rq_ops = &ost_hpreq_rw;
518                 } else if (opc == OST_PUNCH) {
519                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
520                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
521
522                         body = req_capsule_client_get(&req->rq_pill,
523                                                       &RMF_OST_BODY);
524                         if (body == NULL) {
525                                 CERROR("Missing/short ost_body\n");
526                                 RETURN(-EFAULT);
527                         }
528
529                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
530                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
531                                 req->rq_ops = &ost_hpreq_punch;
532                 }
533         }
534         RETURN(0);
535 }
536
537 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
538
539 static struct cfs_cpt_table     *ost_io_cptable;
540
541 #ifdef LPROCFS
542 LPROC_SEQ_FOPS_RO_TYPE(ost, uuid);
543
544 static struct lprocfs_seq_vars lprocfs_ost_obd_vars[] = {
545         { "uuid",       &ost_uuid_fops  },
546         { 0 }
547 };
548 #endif /* LPROCFS */
549
550 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
551 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
552 {
553         static struct ptlrpc_service_conf       svc_conf;
554         struct ost_obd *ost = &obd->u.ost;
555         nodemask_t              *mask;
556         int rc;
557         ENTRY;
558
559         rc = cfs_cleanup_group_info();
560         if (rc)
561                 RETURN(rc);
562
563 #ifdef LPROCFS
564         obd->obd_vars = lprocfs_ost_obd_vars;
565         lprocfs_seq_obd_setup(obd);
566 #endif
567         mutex_init(&ost->ost_health_mutex);
568
569         svc_conf = (typeof(svc_conf)) {
570                 .psc_name               = LUSTRE_OSS_NAME,
571                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
572                 .psc_buf                = {
573                         .bc_nbufs               = OST_NBUFS,
574                         .bc_buf_size            = OST_BUFSIZE,
575                         .bc_req_max_size        = OST_MAXREQSIZE,
576                         .bc_rep_max_size        = OST_MAXREPSIZE,
577                         .bc_req_portal          = OST_REQUEST_PORTAL,
578                         .bc_rep_portal          = OSC_REPLY_PORTAL,
579                 },
580                 .psc_thr                = {
581                         .tc_thr_name            = "ll_ost",
582                         .tc_thr_factor          = OSS_THR_FACTOR,
583                         .tc_nthrs_init          = OSS_NTHRS_INIT,
584                         .tc_nthrs_base          = OSS_NTHRS_BASE,
585                         .tc_nthrs_max           = OSS_NTHRS_MAX,
586                         .tc_nthrs_user          = oss_num_threads,
587                         .tc_cpu_affinity        = 1,
588                         .tc_ctx_tags            = LCT_DT_THREAD,
589                 },
590                 .psc_cpt                = {
591                         .cc_pattern             = oss_cpts,
592                 },
593                 .psc_ops                = {
594                         .so_req_handler         = tgt_request_handle,
595                         .so_req_printer         = target_print_req,
596                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
597                 },
598         };
599         ost->ost_service = ptlrpc_register_service(&svc_conf,
600                                                    obd->obd_proc_entry);
601         if (IS_ERR(ost->ost_service)) {
602                 rc = PTR_ERR(ost->ost_service);
603                 CERROR("failed to start service: %d\n", rc);
604                 GOTO(out_lprocfs, rc);
605         }
606
607         memset(&svc_conf, 0, sizeof(svc_conf));
608         svc_conf = (typeof(svc_conf)) {
609                 .psc_name               = "ost_create",
610                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
611                 .psc_buf                = {
612                         .bc_nbufs               = OST_NBUFS,
613                         .bc_buf_size            = OST_BUFSIZE,
614                         .bc_req_max_size        = OST_MAXREQSIZE,
615                         .bc_rep_max_size        = OST_MAXREPSIZE,
616                         .bc_req_portal          = OST_CREATE_PORTAL,
617                         .bc_rep_portal          = OSC_REPLY_PORTAL,
618                 },
619                 .psc_thr                = {
620                         .tc_thr_name            = "ll_ost_create",
621                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
622                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
623                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
624                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
625                         .tc_nthrs_user          = oss_num_create_threads,
626                         .tc_cpu_affinity        = 1,
627                         .tc_ctx_tags            = LCT_DT_THREAD,
628                 },
629                 .psc_cpt                = {
630                         .cc_pattern             = oss_cpts,
631                 },
632                 .psc_ops                = {
633                         .so_req_handler         = tgt_request_handle,
634                         .so_req_printer         = target_print_req,
635                 },
636         };
637         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
638                                                           obd->obd_proc_entry);
639         if (IS_ERR(ost->ost_create_service)) {
640                 rc = PTR_ERR(ost->ost_create_service);
641                 CERROR("failed to start OST create service: %d\n", rc);
642                 GOTO(out_service, rc);
643         }
644
645         mask = cfs_cpt_table->ctb_nodemask;
646         /* event CPT feature is disabled in libcfs level by set partition
647          * number to 1, we still want to set node affinity for io service */
648         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
649                 int     cpt = 0;
650                 int     i;
651
652                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
653                 for_each_node_mask(i, *mask) {
654                         if (ost_io_cptable == NULL) {
655                                 CWARN("OSS failed to create CPT table\n");
656                                 break;
657                         }
658
659                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
660                         if (!rc) {
661                                 CWARN("OSS Failed to set node %d for"
662                                       "IO CPT table\n", i);
663                                 cfs_cpt_table_free(ost_io_cptable);
664                                 ost_io_cptable = NULL;
665                                 break;
666                         }
667                 }
668         }
669
670         memset(&svc_conf, 0, sizeof(svc_conf));
671         svc_conf = (typeof(svc_conf)) {
672                 .psc_name               = "ost_io",
673                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
674                 .psc_buf                = {
675                         .bc_nbufs               = OST_NBUFS,
676                         .bc_buf_size            = OST_IO_BUFSIZE,
677                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
678                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
679                         .bc_req_portal          = OST_IO_PORTAL,
680                         .bc_rep_portal          = OSC_REPLY_PORTAL,
681                 },
682                 .psc_thr                = {
683                         .tc_thr_name            = "ll_ost_io",
684                         .tc_thr_factor          = OSS_THR_FACTOR,
685                         .tc_nthrs_init          = OSS_NTHRS_INIT,
686                         .tc_nthrs_base          = OSS_NTHRS_BASE,
687                         .tc_nthrs_max           = OSS_NTHRS_MAX,
688                         .tc_nthrs_user          = oss_num_threads,
689                         .tc_cpu_affinity        = 1,
690                         .tc_ctx_tags            = LCT_DT_THREAD,
691                 },
692                 .psc_cpt                = {
693                         .cc_cptable             = ost_io_cptable,
694                         .cc_pattern             = ost_io_cptable == NULL ?
695                                                   oss_io_cpts : NULL,
696                 },
697                 .psc_ops                = {
698                         .so_thr_init            = tgt_io_thread_init,
699                         .so_thr_done            = tgt_io_thread_done,
700                         .so_req_handler         = tgt_request_handle,
701                         .so_hpreq_handler       = ost_io_hpreq_handler,
702                         .so_req_printer         = target_print_req,
703                 },
704         };
705         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
706                                                       obd->obd_proc_entry);
707         if (IS_ERR(ost->ost_io_service)) {
708                 rc = PTR_ERR(ost->ost_io_service);
709                 CERROR("failed to start OST I/O service: %d\n", rc);
710                 ost->ost_io_service = NULL;
711                 GOTO(out_create, rc);
712         }
713
714         memset(&svc_conf, 0, sizeof(svc_conf));
715         svc_conf = (typeof(svc_conf)) {
716                 .psc_name               = "ost_seq",
717                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
718                 .psc_buf                = {
719                         .bc_nbufs               = OST_NBUFS,
720                         .bc_buf_size            = OST_BUFSIZE,
721                         .bc_req_max_size        = OST_MAXREQSIZE,
722                         .bc_rep_max_size        = OST_MAXREPSIZE,
723                         .bc_req_portal          = SEQ_DATA_PORTAL,
724                         .bc_rep_portal          = OSC_REPLY_PORTAL,
725                 },
726                 .psc_thr                = {
727                         .tc_thr_name            = "ll_ost_seq",
728                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
729                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
730                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
731                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
732                         .tc_nthrs_user          = oss_num_create_threads,
733                         .tc_cpu_affinity        = 1,
734                         .tc_ctx_tags            = LCT_DT_THREAD,
735                 },
736
737                 .psc_cpt                = {
738                         .cc_pattern          = oss_cpts,
739                 },
740                 .psc_ops                = {
741                         .so_req_handler         = tgt_request_handle,
742                         .so_req_printer         = target_print_req,
743                         .so_hpreq_handler       = NULL,
744                 },
745         };
746         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
747                                                       obd->obd_proc_entry);
748         if (IS_ERR(ost->ost_seq_service)) {
749                 rc = PTR_ERR(ost->ost_seq_service);
750                 CERROR("failed to start OST seq service: %d\n", rc);
751                 ost->ost_seq_service = NULL;
752                 GOTO(out_io, rc);
753         }
754
755         /* Object update service */
756         memset(&svc_conf, 0, sizeof(svc_conf));
757         svc_conf = (typeof(svc_conf)) {
758                 .psc_name               = "ost_out",
759                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
760                 .psc_buf                = {
761                         .bc_nbufs               = OST_NBUFS,
762                         .bc_buf_size            = OUT_BUFSIZE,
763                         .bc_req_max_size        = OUT_MAXREQSIZE,
764                         .bc_rep_max_size        = OUT_MAXREPSIZE,
765                         .bc_req_portal          = OUT_PORTAL,
766                         .bc_rep_portal          = OSC_REPLY_PORTAL,
767                 },
768                 /*
769                  * We'd like to have a mechanism to set this on a per-device
770                  * basis, but alas...
771                  */
772                 .psc_thr                = {
773                         .tc_thr_name            = "ll_ost_out",
774                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
775                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
776                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
777                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
778                         .tc_nthrs_user          = oss_num_create_threads,
779                         .tc_cpu_affinity        = 1,
780                         .tc_ctx_tags            = LCT_MD_THREAD |
781                                                   LCT_DT_THREAD,
782                 },
783                 .psc_cpt                = {
784                         .cc_pattern             = oss_cpts,
785                 },
786                 .psc_ops                = {
787                         .so_req_handler         = tgt_request_handle,
788                         .so_req_printer         = target_print_req,
789                         .so_hpreq_handler       = NULL,
790                 },
791         };
792         ost->ost_out_service = ptlrpc_register_service(&svc_conf,
793                                                        obd->obd_proc_entry);
794         if (IS_ERR(ost->ost_out_service)) {
795                 rc = PTR_ERR(ost->ost_out_service);
796                 CERROR("failed to start out service: %d\n", rc);
797                 ost->ost_out_service = NULL;
798                 GOTO(out_seq, rc);
799         }
800
801         ping_evictor_start();
802
803         RETURN(0);
804 out_seq:
805         ptlrpc_unregister_service(ost->ost_seq_service);
806         ost->ost_seq_service = NULL;
807 out_io:
808         ptlrpc_unregister_service(ost->ost_io_service);
809         ost->ost_io_service = NULL;
810 out_create:
811         ptlrpc_unregister_service(ost->ost_create_service);
812         ost->ost_create_service = NULL;
813 out_service:
814         ptlrpc_unregister_service(ost->ost_service);
815         ost->ost_service = NULL;
816 out_lprocfs:
817         lprocfs_obd_cleanup(obd);
818         RETURN(rc);
819 }
820
821 static int ost_cleanup(struct obd_device *obd)
822 {
823         struct ost_obd *ost = &obd->u.ost;
824         int err = 0;
825         ENTRY;
826
827         ping_evictor_stop();
828
829         /* there is no recovery for OST OBD, all recovery is controlled by
830          * obdfilter OBD */
831         LASSERT(obd->obd_recovering == 0);
832         mutex_lock(&ost->ost_health_mutex);
833         ptlrpc_unregister_service(ost->ost_service);
834         ptlrpc_unregister_service(ost->ost_create_service);
835         ptlrpc_unregister_service(ost->ost_io_service);
836         ptlrpc_unregister_service(ost->ost_seq_service);
837         ptlrpc_unregister_service(ost->ost_out_service);
838
839         ost->ost_service = NULL;
840         ost->ost_create_service = NULL;
841         ost->ost_io_service = NULL;
842         ost->ost_seq_service = NULL;
843         ost->ost_out_service = NULL;
844
845         mutex_unlock(&ost->ost_health_mutex);
846
847         lprocfs_obd_cleanup(obd);
848
849         if (ost_io_cptable != NULL) {
850                 cfs_cpt_table_free(ost_io_cptable);
851                 ost_io_cptable = NULL;
852         }
853
854         RETURN(err);
855 }
856
857 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
858 {
859         struct ost_obd *ost = &obd->u.ost;
860         int rc = 0;
861
862         mutex_lock(&ost->ost_health_mutex);
863         rc |= ptlrpc_service_health_check(ost->ost_service);
864         rc |= ptlrpc_service_health_check(ost->ost_create_service);
865         rc |= ptlrpc_service_health_check(ost->ost_io_service);
866         mutex_unlock(&ost->ost_health_mutex);
867
868         /*
869          * health_check to return 0 on healthy
870          * and 1 on unhealthy.
871          */
872         if( rc != 0)
873                 rc = 1;
874
875         return rc;
876 }
877
878 /* use obd ops to offer management infrastructure */
879 static struct obd_ops ost_obd_ops = {
880         .o_owner        = THIS_MODULE,
881         .o_setup        = ost_setup,
882         .o_cleanup      = ost_cleanup,
883         .o_health_check = ost_health_check,
884 };
885
886
887 static int __init ost_init(void)
888 {
889         int rc;
890
891         ENTRY;
892
893         rc = class_register_type(&ost_obd_ops, NULL, NULL,
894 #ifndef HAVE_ONLY_PROCFS_SEQ
895                                 NULL,
896 #endif
897                                 LUSTRE_OSS_NAME, NULL);
898
899         if (ost_num_threads != 0 && oss_num_threads == 0) {
900                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
901                               "use oss_num_threads instead or unset both for "
902                               "dynamic thread startup\n");
903                 oss_num_threads = ost_num_threads;
904         }
905
906         RETURN(rc);
907 }
908
909 static void /*__exit*/ ost_exit(void)
910 {
911         class_unregister_type(LUSTRE_OSS_NAME);
912 }
913
914 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
915 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
916 MODULE_LICENSE("GPL");
917
918 module_init(ost_init);
919 module_exit(ost_exit);