Whamcloud - gitweb
LU-3467 ofd: use unified handler for OST requests
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OST
43
44 #include <linux/module.h>
45 #include <obd_ost.h>
46 #include <lustre_dlm.h>
47 #include <lprocfs_status.h>
48 #include "ost_internal.h"
49
50 static int oss_num_threads;
51 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
52                 "number of OSS service threads to start");
53
54 static int ost_num_threads;
55 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
56                 "number of OST service threads to start (deprecated)");
57
58 static int oss_num_create_threads;
59 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
60                 "number of OSS create threads to start");
61
62 static char *oss_cpts;
63 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
64                 "CPU partitions OSS threads should run on");
65
66 static char *oss_io_cpts;
67 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
68                 "CPU partitions OSS IO threads should run on");
69
70 /**
71  * Validate oa from client.
72  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
73  * req are valid.
74  *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
75  *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
76  *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
77  *       whether this is oi_fid or real ostid. So it will check
78  *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
79  *    c. Old FID-disable osc will send IDIF.
80  *    d. new FID-enable osc/osp will send normal FID.
81  *
82  * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
83  * be used for LAST_ID file, and only being accessed inside OST now.
84  */
85 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
86                              struct obd_ioobj *ioobj)
87 {
88         int rc = 0;
89
90         if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
91                      fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
92                 /* Sigh 2.[123] client still sends echo req with oi_id = 0
93                  * during create, and we will reset this to 1, since this
94                  * oi_id is basically useless in the following create process,
95                  * but oi_id == 0 will make it difficult to tell whether it is
96                  * real FID or ost_id. */
97                 oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
98                 oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
99                 oa->o_oi.oi_fid.f_ver = 0;
100         } else {
101                 if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
102                         GOTO(out, rc = -EPROTO);
103
104                 /* Note: this check might be forced in 2.5 or 2.6, i.e.
105                  * all of the requests are required to setup FLGROUP */
106                 if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
107                         ostid_set_seq_mdt0(&oa->o_oi);
108                         if (ioobj)
109                                 ostid_set_seq_mdt0(&ioobj->ioo_oid);
110                         oa->o_valid |= OBD_MD_FLGROUP;
111                 }
112
113                 if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
114                                fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
115                                fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
116                                fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
117                         GOTO(out, rc = -EPROTO);
118         }
119
120         if (ioobj != NULL) {
121                 unsigned max_brw = ioobj_max_brw_get(ioobj);
122
123                 if (unlikely((max_brw & (max_brw - 1)) != 0)) {
124                         CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
125                                ": rc = -EPROTO\n", exp->exp_obd->obd_name,
126                                obd_export_nid2str(exp), max_brw,
127                                POSTID(&oa->o_oi));
128                         GOTO(out, rc = -EPROTO);
129                 }
130                 ioobj->ioo_oid = oa->o_oi;
131         }
132
133 out:
134         if (rc != 0)
135                 CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
136                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
137                        oa ? ostid_seq(&oa->o_oi) : -1,
138                        oa ? ostid_id(&oa->o_oi) : -1, rc);
139         return rc;
140 }
141
142 struct ost_prolong_data {
143         struct ptlrpc_request *opd_req;
144         struct obd_export     *opd_exp;
145         struct obdo           *opd_oa;
146         struct ldlm_res_id     opd_resid;
147         struct ldlm_extent     opd_extent;
148         ldlm_mode_t            opd_mode;
149         unsigned int           opd_locks;
150         int                    opd_timeout;
151 };
152
153 /* prolong locks for the current service time of the corresponding
154  * portal (= OST_IO_PORTAL)
155  */
156 static inline int prolong_timeout(struct ptlrpc_request *req)
157 {
158         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
159
160         if (AT_OFF)
161                 return obd_timeout / 2;
162
163         return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
164                    ldlm_timeout);
165 }
166
167 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
168                                  struct ldlm_lock *lock)
169 {
170         LASSERT(lock->l_export == opd->opd_exp);
171
172         if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
173                 return;
174
175         /* XXX: never try to grab resource lock here because we're inside
176          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
177          * res lock and then exp_bl_list_lock. */
178
179         if (!(lock->l_flags & LDLM_FL_AST_SENT))
180                 /* ignore locks not being cancelled */
181                 return;
182
183         LDLM_DEBUG(lock,
184                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
185                    opd->opd_req->rq_xid, opd->opd_extent.start,
186                    opd->opd_extent.end, opd->opd_timeout);
187
188         /* OK. this is a possible lock the user holds doing I/O
189          * let's refresh eviction timer for it */
190         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
191         ++opd->opd_locks;
192 }
193
194 static void ost_prolong_locks(struct ost_prolong_data *data)
195 {
196         struct obd_export *exp = data->opd_exp;
197         struct obdo       *oa  = data->opd_oa;
198         struct ldlm_lock  *lock;
199         ENTRY;
200
201         if (oa->o_valid & OBD_MD_FLHANDLE) {
202                 /* mostly a request should be covered by only one lock, try
203                  * fast path. */
204                 lock = ldlm_handle2lock(&oa->o_handle);
205                 if (lock != NULL) {
206                         /* Fast path to check if the lock covers the whole IO
207                          * region exclusively. */
208                         if (lock->l_granted_mode == LCK_PW &&
209                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
210                                                 &data->opd_extent)) {
211                                 /* bingo */
212                                 ost_prolong_lock_one(data, lock);
213                                 LDLM_LOCK_PUT(lock);
214                                 RETURN_EXIT;
215                         }
216                         LDLM_LOCK_PUT(lock);
217                 }
218         }
219
220
221         spin_lock_bh(&exp->exp_bl_list_lock);
222         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
223                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
224                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
225
226                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
227                         continue;
228
229                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
230                                          &data->opd_extent))
231                         continue;
232
233                 ost_prolong_lock_one(data, lock);
234         }
235         spin_unlock_bh(&exp->exp_bl_list_lock);
236
237         EXIT;
238 }
239
240 /**
241  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
242  * not.
243  */
244 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
245                                    struct ldlm_lock *lock)
246 {
247         struct niobuf_remote *nb;
248         struct obd_ioobj *ioo;
249         int mode, opc;
250         struct ldlm_extent ext;
251         ENTRY;
252
253         opc = lustre_msg_get_opc(req->rq_reqmsg);
254         LASSERT(opc == OST_READ || opc == OST_WRITE);
255
256         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
257         LASSERT(ioo != NULL);
258
259         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
260         LASSERT(nb != NULL);
261
262         ext.start = nb->offset;
263         nb += ioo->ioo_bufcnt - 1;
264         ext.end = nb->offset + nb->len - 1;
265
266         LASSERT(lock->l_resource != NULL);
267         if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
268                 RETURN(0);
269
270         mode = LCK_PW;
271         if (opc == OST_READ)
272                 mode |= LCK_PR;
273         if (!(lock->l_granted_mode & mode))
274                 RETURN(0);
275
276         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
277 }
278
279 /**
280  * High-priority queue request check for whether the given PTLRPC request (\a
281  * req) is blocking an LDLM lock cancel.
282  *
283  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
284  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
285  *
286  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
287  * function looks only at OST_READs and OST_WRITEs.
288  */
289 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
290 {
291         struct obd_device *obd = req->rq_export->exp_obd;
292         struct ost_body *body;
293         struct obd_ioobj *ioo;
294         struct niobuf_remote *nb;
295         struct ost_prolong_data opd = { 0 };
296         int mode, opc;
297         ENTRY;
298
299         /*
300          * Use LASSERT to do sanity check because malformed RPCs should have
301          * been filtered out in ost_hpreq_handler().
302          */
303         opc = lustre_msg_get_opc(req->rq_reqmsg);
304         LASSERT(opc == OST_READ || opc == OST_WRITE);
305
306         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
307         LASSERT(body != NULL);
308
309         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
310         LASSERT(ioo != NULL);
311
312         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
313         LASSERT(nb != NULL);
314         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
315
316         ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
317
318         opd.opd_req = req;
319         mode = LCK_PW;
320         if (opc == OST_READ)
321                 mode |= LCK_PR;
322         opd.opd_mode = mode;
323         opd.opd_exp = req->rq_export;
324         opd.opd_oa  = &body->oa;
325         opd.opd_extent.start = nb->offset;
326         nb += ioo->ioo_bufcnt - 1;
327         opd.opd_extent.end = nb->offset + nb->len - 1;
328         opd.opd_timeout = prolong_timeout(req);
329
330         DEBUG_REQ(D_RPCTRACE, req,
331                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
332                obd->obd_name, current->comm,
333                opd.opd_resid.name[0], opd.opd_resid.name[1],
334                opd.opd_extent.start, opd.opd_extent.end);
335
336         ost_prolong_locks(&opd);
337
338         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
339                obd->obd_name, opd.opd_locks, req);
340
341         RETURN(opd.opd_locks > 0);
342 }
343
344 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
345 {
346         (void)ost_rw_hpreq_check(req);
347 }
348
349 /**
350  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
351  */
352 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
353                                       struct ldlm_lock *lock)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359         LASSERT(body != NULL);
360
361         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
362             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
363                 RETURN(1);
364
365         RETURN(0);
366 }
367
368 /**
369  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
370  */
371 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
372 {
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct ost_body *body;
375         struct obdo *oa;
376         struct ost_prolong_data opd = { 0 };
377         __u64 start, end;
378         ENTRY;
379
380         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
381         LASSERT(body != NULL);
382
383         oa = &body->oa;
384         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
385                 !(oa->o_flags & OBD_FL_SRVLOCK));
386
387         start = oa->o_size;
388         end = start + oa->o_blocks;
389
390         opd.opd_req = req;
391         opd.opd_mode = LCK_PW;
392         opd.opd_exp = req->rq_export;
393         opd.opd_oa  = oa;
394         opd.opd_extent.start = start;
395         opd.opd_extent.end   = end;
396         if (oa->o_blocks == OBD_OBJECT_EOF)
397                 opd.opd_extent.end = OBD_OBJECT_EOF;
398         opd.opd_timeout = prolong_timeout(req);
399
400         ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
401
402         CDEBUG(D_DLMTRACE,
403                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
404                obd->obd_name,
405                opd.opd_resid.name[0], opd.opd_resid.name[1],
406                opd.opd_extent.start, opd.opd_extent.end);
407
408         ost_prolong_locks(&opd);
409
410         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
411                obd->obd_name, opd.opd_locks, req);
412
413         RETURN(opd.opd_locks > 0);
414 }
415
416 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
417 {
418         (void)ost_punch_hpreq_check(req);
419 }
420
421 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
422         .hpreq_lock_match = ost_rw_hpreq_lock_match,
423         .hpreq_check      = ost_rw_hpreq_check,
424         .hpreq_fini       = ost_rw_hpreq_fini
425 };
426
427 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
428         .hpreq_lock_match = ost_punch_hpreq_lock_match,
429         .hpreq_check      = ost_punch_hpreq_check,
430         .hpreq_fini       = ost_punch_hpreq_fini
431 };
432
433 /** Assign high priority operations to the request if needed. */
434 static int ost_io_hpreq_handler(struct ptlrpc_request *req)
435 {
436         ENTRY;
437         if (req->rq_export) {
438                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
439                 struct ost_body *body;
440
441                 if (opc == OST_READ || opc == OST_WRITE) {
442                         struct niobuf_remote *nb;
443                         struct obd_ioobj *ioo;
444                         int objcount, niocount;
445                         int rc;
446                         int i;
447
448                         /* RPCs on the H-P queue can be inspected before
449                          * ost_handler() initializes their pills, so we
450                          * initialize that here.  Capsule initialization is
451                          * idempotent, as is setting the pill's format (provided
452                          * it doesn't change).
453                          */
454                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
455                         if (opc == OST_READ)
456                                 req_capsule_set(&req->rq_pill,
457                                                 &RQF_OST_BRW_READ);
458                         else
459                                 req_capsule_set(&req->rq_pill,
460                                                 &RQF_OST_BRW_WRITE);
461
462                         body = req_capsule_client_get(&req->rq_pill,
463                                                       &RMF_OST_BODY);
464                         if (body == NULL) {
465                                 CERROR("Missing/short ost_body\n");
466                                 RETURN(-EFAULT);
467                         }
468
469                         objcount = req_capsule_get_size(&req->rq_pill,
470                                                         &RMF_OBD_IOOBJ,
471                                                         RCL_CLIENT) /
472                                                         sizeof(*ioo);
473                         if (objcount == 0) {
474                                 CERROR("Missing/short ioobj\n");
475                                 RETURN(-EFAULT);
476                         }
477                         if (objcount > 1) {
478                                 CERROR("too many ioobjs (%d)\n", objcount);
479                                 RETURN(-EFAULT);
480                         }
481
482                         ioo = req_capsule_client_get(&req->rq_pill,
483                                                      &RMF_OBD_IOOBJ);
484                         if (ioo == NULL) {
485                                 CERROR("Missing/short ioobj\n");
486                                 RETURN(-EFAULT);
487                         }
488
489                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
490                         if (rc) {
491                                 CERROR("invalid object ids\n");
492                                 RETURN(rc);
493                         }
494
495                         for (niocount = i = 0; i < objcount; i++) {
496                                 if (ioo[i].ioo_bufcnt == 0) {
497                                         CERROR("ioo[%d] has zero bufcnt\n", i);
498                                         RETURN(-EFAULT);
499                                 }
500                                 niocount += ioo[i].ioo_bufcnt;
501                         }
502                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
503                                 DEBUG_REQ(D_RPCTRACE, req,
504                                           "bulk has too many pages (%d)",
505                                           niocount);
506                                 RETURN(-EFAULT);
507                         }
508
509                         nb = req_capsule_client_get(&req->rq_pill,
510                                                     &RMF_NIOBUF_REMOTE);
511                         if (nb == NULL) {
512                                 CERROR("Missing/short niobuf\n");
513                                 RETURN(-EFAULT);
514                         }
515
516                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
517                                 req->rq_ops = &ost_hpreq_rw;
518                 } else if (opc == OST_PUNCH) {
519                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
520                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
521
522                         body = req_capsule_client_get(&req->rq_pill,
523                                                       &RMF_OST_BODY);
524                         if (body == NULL) {
525                                 CERROR("Missing/short ost_body\n");
526                                 RETURN(-EFAULT);
527                         }
528
529                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
530                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
531                                 req->rq_ops = &ost_hpreq_punch;
532                 }
533         }
534         RETURN(0);
535 }
536
537 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
538
539 static struct cfs_cpt_table     *ost_io_cptable;
540
541 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
542 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
543 {
544         static struct ptlrpc_service_conf       svc_conf;
545         struct ost_obd *ost = &obd->u.ost;
546         struct lprocfs_static_vars lvars;
547         nodemask_t              *mask;
548         int rc;
549         ENTRY;
550
551         rc = cfs_cleanup_group_info();
552         if (rc)
553                 RETURN(rc);
554
555         lprocfs_ost_init_vars(&lvars);
556         lprocfs_obd_setup(obd, lvars.obd_vars);
557
558         mutex_init(&ost->ost_health_mutex);
559
560         svc_conf = (typeof(svc_conf)) {
561                 .psc_name               = LUSTRE_OSS_NAME,
562                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
563                 .psc_buf                = {
564                         .bc_nbufs               = OST_NBUFS,
565                         .bc_buf_size            = OST_BUFSIZE,
566                         .bc_req_max_size        = OST_MAXREQSIZE,
567                         .bc_rep_max_size        = OST_MAXREPSIZE,
568                         .bc_req_portal          = OST_REQUEST_PORTAL,
569                         .bc_rep_portal          = OSC_REPLY_PORTAL,
570                 },
571                 .psc_thr                = {
572                         .tc_thr_name            = "ll_ost",
573                         .tc_thr_factor          = OSS_THR_FACTOR,
574                         .tc_nthrs_init          = OSS_NTHRS_INIT,
575                         .tc_nthrs_base          = OSS_NTHRS_BASE,
576                         .tc_nthrs_max           = OSS_NTHRS_MAX,
577                         .tc_nthrs_user          = oss_num_threads,
578                         .tc_cpu_affinity        = 1,
579                         .tc_ctx_tags            = LCT_DT_THREAD,
580                 },
581                 .psc_cpt                = {
582                         .cc_pattern             = oss_cpts,
583                 },
584                 .psc_ops                = {
585                         .so_req_handler         = tgt_request_handle,
586                         .so_req_printer         = target_print_req,
587                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
588                 },
589         };
590         ost->ost_service = ptlrpc_register_service(&svc_conf,
591                                                    obd->obd_proc_entry);
592         if (IS_ERR(ost->ost_service)) {
593                 rc = PTR_ERR(ost->ost_service);
594                 CERROR("failed to start service: %d\n", rc);
595                 GOTO(out_lprocfs, rc);
596         }
597
598         memset(&svc_conf, 0, sizeof(svc_conf));
599         svc_conf = (typeof(svc_conf)) {
600                 .psc_name               = "ost_create",
601                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
602                 .psc_buf                = {
603                         .bc_nbufs               = OST_NBUFS,
604                         .bc_buf_size            = OST_BUFSIZE,
605                         .bc_req_max_size        = OST_MAXREQSIZE,
606                         .bc_rep_max_size        = OST_MAXREPSIZE,
607                         .bc_req_portal          = OST_CREATE_PORTAL,
608                         .bc_rep_portal          = OSC_REPLY_PORTAL,
609                 },
610                 .psc_thr                = {
611                         .tc_thr_name            = "ll_ost_create",
612                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
613                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
614                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
615                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
616                         .tc_nthrs_user          = oss_num_create_threads,
617                         .tc_cpu_affinity        = 1,
618                         .tc_ctx_tags            = LCT_DT_THREAD,
619                 },
620                 .psc_cpt                = {
621                         .cc_pattern             = oss_cpts,
622                 },
623                 .psc_ops                = {
624                         .so_req_handler         = tgt_request_handle,
625                         .so_req_printer         = target_print_req,
626                 },
627         };
628         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
629                                                           obd->obd_proc_entry);
630         if (IS_ERR(ost->ost_create_service)) {
631                 rc = PTR_ERR(ost->ost_create_service);
632                 CERROR("failed to start OST create service: %d\n", rc);
633                 GOTO(out_service, rc);
634         }
635
636         mask = cfs_cpt_table->ctb_nodemask;
637         /* event CPT feature is disabled in libcfs level by set partition
638          * number to 1, we still want to set node affinity for io service */
639         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
640                 int     cpt = 0;
641                 int     i;
642
643                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
644                 for_each_node_mask(i, *mask) {
645                         if (ost_io_cptable == NULL) {
646                                 CWARN("OSS failed to create CPT table\n");
647                                 break;
648                         }
649
650                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
651                         if (!rc) {
652                                 CWARN("OSS Failed to set node %d for"
653                                       "IO CPT table\n", i);
654                                 cfs_cpt_table_free(ost_io_cptable);
655                                 ost_io_cptable = NULL;
656                                 break;
657                         }
658                 }
659         }
660
661         memset(&svc_conf, 0, sizeof(svc_conf));
662         svc_conf = (typeof(svc_conf)) {
663                 .psc_name               = "ost_io",
664                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
665                 .psc_buf                = {
666                         .bc_nbufs               = OST_NBUFS,
667                         .bc_buf_size            = OST_IO_BUFSIZE,
668                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
669                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
670                         .bc_req_portal          = OST_IO_PORTAL,
671                         .bc_rep_portal          = OSC_REPLY_PORTAL,
672                 },
673                 .psc_thr                = {
674                         .tc_thr_name            = "ll_ost_io",
675                         .tc_thr_factor          = OSS_THR_FACTOR,
676                         .tc_nthrs_init          = OSS_NTHRS_INIT,
677                         .tc_nthrs_base          = OSS_NTHRS_BASE,
678                         .tc_nthrs_max           = OSS_NTHRS_MAX,
679                         .tc_nthrs_user          = oss_num_threads,
680                         .tc_cpu_affinity        = 1,
681                         .tc_ctx_tags            = LCT_DT_THREAD,
682                 },
683                 .psc_cpt                = {
684                         .cc_cptable             = ost_io_cptable,
685                         .cc_pattern             = ost_io_cptable == NULL ?
686                                                   oss_io_cpts : NULL,
687                 },
688                 .psc_ops                = {
689                         .so_thr_init            = tgt_io_thread_init,
690                         .so_thr_done            = tgt_io_thread_done,
691                         .so_req_handler         = tgt_request_handle,
692                         .so_hpreq_handler       = ost_io_hpreq_handler,
693                         .so_req_printer         = target_print_req,
694                 },
695         };
696         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
697                                                       obd->obd_proc_entry);
698         if (IS_ERR(ost->ost_io_service)) {
699                 rc = PTR_ERR(ost->ost_io_service);
700                 CERROR("failed to start OST I/O service: %d\n", rc);
701                 ost->ost_io_service = NULL;
702                 GOTO(out_create, rc);
703         }
704
705         memset(&svc_conf, 0, sizeof(svc_conf));
706         svc_conf = (typeof(svc_conf)) {
707                 .psc_name               = "ost_seq",
708                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
709                 .psc_buf                = {
710                         .bc_nbufs               = OST_NBUFS,
711                         .bc_buf_size            = OST_BUFSIZE,
712                         .bc_req_max_size        = OST_MAXREQSIZE,
713                         .bc_rep_max_size        = OST_MAXREPSIZE,
714                         .bc_req_portal          = SEQ_DATA_PORTAL,
715                         .bc_rep_portal          = OSC_REPLY_PORTAL,
716                 },
717                 .psc_thr                = {
718                         .tc_thr_name            = "ll_ost_seq",
719                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
720                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
721                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
722                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
723                         .tc_nthrs_user          = oss_num_create_threads,
724                         .tc_cpu_affinity        = 1,
725                         .tc_ctx_tags            = LCT_DT_THREAD,
726                 },
727
728                 .psc_cpt                = {
729                         .cc_pattern          = oss_cpts,
730                 },
731                 .psc_ops                = {
732                         .so_req_handler         = tgt_request_handle,
733                         .so_req_printer         = target_print_req,
734                         .so_hpreq_handler       = NULL,
735                 },
736         };
737         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
738                                                       obd->obd_proc_entry);
739         if (IS_ERR(ost->ost_seq_service)) {
740                 rc = PTR_ERR(ost->ost_seq_service);
741                 CERROR("failed to start OST seq service: %d\n", rc);
742                 ost->ost_seq_service = NULL;
743                 GOTO(out_io, rc);
744         }
745
746         /* Object update service */
747         memset(&svc_conf, 0, sizeof(svc_conf));
748         svc_conf = (typeof(svc_conf)) {
749                 .psc_name               = "ost_out",
750                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
751                 .psc_buf                = {
752                         .bc_nbufs               = OST_NBUFS,
753                         .bc_buf_size            = OUT_BUFSIZE,
754                         .bc_req_max_size        = OUT_MAXREQSIZE,
755                         .bc_rep_max_size        = OUT_MAXREPSIZE,
756                         .bc_req_portal          = OUT_PORTAL,
757                         .bc_rep_portal          = OSC_REPLY_PORTAL,
758                 },
759                 /*
760                  * We'd like to have a mechanism to set this on a per-device
761                  * basis, but alas...
762                  */
763                 .psc_thr                = {
764                         .tc_thr_name            = "ll_ost_out",
765                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
766                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
767                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
768                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
769                         .tc_nthrs_user          = oss_num_create_threads,
770                         .tc_cpu_affinity        = 1,
771                         .tc_ctx_tags            = LCT_DT_THREAD,
772                 },
773                 .psc_cpt                = {
774                         .cc_pattern             = oss_cpts,
775                 },
776                 .psc_ops                = {
777                         .so_req_handler         = tgt_request_handle,
778                         .so_req_printer         = target_print_req,
779                         .so_hpreq_handler       = NULL,
780                 },
781         };
782         ost->ost_out_service = ptlrpc_register_service(&svc_conf,
783                                                        obd->obd_proc_entry);
784         if (IS_ERR(ost->ost_out_service)) {
785                 rc = PTR_ERR(ost->ost_out_service);
786                 CERROR("failed to start out service: %d\n", rc);
787                 ost->ost_out_service = NULL;
788                 GOTO(out_seq, rc);
789         }
790
791         ping_evictor_start();
792
793         RETURN(0);
794 out_seq:
795         ptlrpc_unregister_service(ost->ost_seq_service);
796         ost->ost_seq_service = NULL;
797 out_io:
798         ptlrpc_unregister_service(ost->ost_io_service);
799         ost->ost_io_service = NULL;
800 out_create:
801         ptlrpc_unregister_service(ost->ost_create_service);
802         ost->ost_create_service = NULL;
803 out_service:
804         ptlrpc_unregister_service(ost->ost_service);
805         ost->ost_service = NULL;
806 out_lprocfs:
807         lprocfs_obd_cleanup(obd);
808         RETURN(rc);
809 }
810
811 static int ost_cleanup(struct obd_device *obd)
812 {
813         struct ost_obd *ost = &obd->u.ost;
814         int err = 0;
815         ENTRY;
816
817         ping_evictor_stop();
818
819         /* there is no recovery for OST OBD, all recovery is controlled by
820          * obdfilter OBD */
821         LASSERT(obd->obd_recovering == 0);
822         mutex_lock(&ost->ost_health_mutex);
823         ptlrpc_unregister_service(ost->ost_service);
824         ptlrpc_unregister_service(ost->ost_create_service);
825         ptlrpc_unregister_service(ost->ost_io_service);
826         ptlrpc_unregister_service(ost->ost_seq_service);
827         ptlrpc_unregister_service(ost->ost_out_service);
828
829         ost->ost_service = NULL;
830         ost->ost_create_service = NULL;
831         ost->ost_io_service = NULL;
832         ost->ost_seq_service = NULL;
833         ost->ost_out_service = NULL;
834
835         mutex_unlock(&ost->ost_health_mutex);
836
837         lprocfs_obd_cleanup(obd);
838
839         if (ost_io_cptable != NULL) {
840                 cfs_cpt_table_free(ost_io_cptable);
841                 ost_io_cptable = NULL;
842         }
843
844         RETURN(err);
845 }
846
847 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
848 {
849         struct ost_obd *ost = &obd->u.ost;
850         int rc = 0;
851
852         mutex_lock(&ost->ost_health_mutex);
853         rc |= ptlrpc_service_health_check(ost->ost_service);
854         rc |= ptlrpc_service_health_check(ost->ost_create_service);
855         rc |= ptlrpc_service_health_check(ost->ost_io_service);
856         mutex_unlock(&ost->ost_health_mutex);
857
858         /*
859          * health_check to return 0 on healthy
860          * and 1 on unhealthy.
861          */
862         if( rc != 0)
863                 rc = 1;
864
865         return rc;
866 }
867
868 /* use obd ops to offer management infrastructure */
869 static struct obd_ops ost_obd_ops = {
870         .o_owner        = THIS_MODULE,
871         .o_setup        = ost_setup,
872         .o_cleanup      = ost_cleanup,
873         .o_health_check = ost_health_check,
874 };
875
876
877 static int __init ost_init(void)
878 {
879         struct lprocfs_static_vars lvars;
880         int rc;
881
882         ENTRY;
883
884         lprocfs_ost_init_vars(&lvars);
885         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
886                                  LUSTRE_OSS_NAME, NULL);
887
888         if (ost_num_threads != 0 && oss_num_threads == 0) {
889                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
890                               "use oss_num_threads instead or unset both for "
891                               "dynamic thread startup\n");
892                 oss_num_threads = ost_num_threads;
893         }
894
895         RETURN(rc);
896 }
897
898 static void /*__exit*/ ost_exit(void)
899 {
900         class_unregister_type(LUSTRE_OSS_NAME);
901 }
902
903 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
904 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
905 MODULE_LICENSE("GPL");
906
907 module_init(ost_init);
908 module_exit(ost_exit);