int count;
ENTRY;
+ /* Return, i.e. cancel nothing, only if ELC is supported (flag in
+ * export) but disabled through procfs (flag in NS).
+ *
+ * This distinguishes from a case when ELC is not supported originally,
+ * when we still want to cancel locks in advance and just cancel them
+ * locally, without sending any RPC. */
+ if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
+ RETURN(0);
+
osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
if (res == NULL)
return 0;
}
+int osc_create(const struct lu_env *env, struct obd_export *exp,
+ struct obdo *oa, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(oa);
+ LASSERT(ea);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ oa->o_flags == OBD_FL_RECREATE_OBJS) {
+ RETURN(osc_real_create(exp, oa, ea, oti));
+ }
+
+ if (!fid_seq_is_mdt(oa->o_seq))
+ RETURN(osc_real_create(exp, oa, ea, oti));
+
+ /* we should not get here anymore */
+ LBUG();
+
+ RETURN(rc);
+}
+
/* Destroy requests can be async always on the client, and we don't even really
* care about the return code since the client cannot do anything at all about
* a destroy failure.
osc_pack_capa(req, body, (struct obd_capa *)capa);
ptlrpc_request_set_replen(req);
- /* don't throttle destroy RPCs for the MDT */
- if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
+ /* If osc_destory is for destroying the unlink orphan,
+ * sent from MDT to OST, which should not be blocked here,
+ * because the process might be triggered by ptlrpcd, and
+ * it is not good to block ptlrpcd thread (b=16006)*/
+ if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
req->rq_interpret_reply = osc_destroy_interpret;
if (!osc_can_send_destroy(cli)) {
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
}
static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
- struct brw_page **pga, int opc,
- cksum_type_t cksum_type)
-{
- __u32 cksum;
- int i = 0;
+ struct brw_page **pga, int opc,
+ cksum_type_t cksum_type)
+{
+ __u32 cksum;
+ int i = 0;
+ struct cfs_crypto_hash_desc *hdesc;
+ unsigned int bufsize;
+ int err;
+ unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
+
+ LASSERT(pg_count > 0);
+
+ hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+ if (IS_ERR(hdesc)) {
+ CERROR("Unable to initialize checksum hash %s\n",
+ cfs_crypto_hash_name(cfs_alg));
+ return PTR_ERR(hdesc);
+ }
- LASSERT (pg_count > 0);
- cksum = init_checksum(cksum_type);
- while (nob > 0 && pg_count > 0) {
- unsigned char *ptr = cfs_kmap(pga[i]->pg);
- int off = pga[i]->off & ~CFS_PAGE_MASK;
- int count = pga[i]->count > nob ? nob : pga[i]->count;
-
- /* corrupt the data before we compute the checksum, to
- * simulate an OST->client data error */
- if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
- memcpy(ptr + off, "bad1", min(4, nob));
- cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
- cfs_kunmap(pga[i]->pg);
- LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
- off, cksum);
+ while (nob > 0 && pg_count > 0) {
+ int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+ /* corrupt the data before we compute the checksum, to
+ * simulate an OST->client data error */
+ if (i == 0 && opc == OST_READ &&
+ OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+ unsigned char *ptr = cfs_kmap(pga[i]->pg);
+ int off = pga[i]->off & ~CFS_PAGE_MASK;
+ memcpy(ptr + off, "bad1", min(4, nob));
+ cfs_kunmap(pga[i]->pg);
+ }
+ cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+ pga[i]->off & ~CFS_PAGE_MASK,
+ count);
+ LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
+ (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
+
+ nob -= pga[i]->count;
+ pg_count--;
+ i++;
+ }
- nob -= pga[i]->count;
- pg_count--;
- i++;
- }
- /* For sending we only compute the wrong checksum instead
- * of corrupting the data so it is still correct on a redo */
- if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
- cksum++;
+ bufsize = 4;
+ err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+ if (err)
+ cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+ /* For sending we only compute the wrong checksum instead
+ * of corrupting the data so it is still correct on a redo */
+ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+ cksum++;
- return fini_checksum(cksum, cksum_type);
+ return cksum;
}
static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
}
req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
ptlrpc_at_set_req_timeout(req);
+ /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
+ * retry logic */
+ req->rq_no_retry_einprogress = 1;
if (opc == OST_WRITE)
desc = ptlrpc_prep_bulk_imp(req, page_count,
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
+ ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
RETURN (rc);
}
-int osc_brw_redo_request(struct ptlrpc_request *request,
- struct osc_brw_async_args *aa)
+static int osc_brw_redo_request(struct ptlrpc_request *request,
+ struct osc_brw_async_args *aa, int rc)
{
struct ptlrpc_request *new_req;
struct osc_brw_async_args *new_aa;
struct osc_async_page *oap;
- int rc = 0;
ENTRY;
- DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
+ "redo for recoverable error %d", rc);
rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
aa->aa_resends++;
new_req->rq_interpret_reply = request->rq_interpret_reply;
new_req->rq_async_args = request->rq_async_args;
- new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+ /* cap resend delay to the current request timeout, this is similar to
+ * what ptlrpc does (see after_reply()) */
+ if (aa->aa_resends > new_req->rq_timeout)
+ new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
+ else
+ new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
new_req->rq_generation_set = 1;
new_req->rq_import_generation = request->rq_import_generation;
aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
} else if (rc == -EINPROGRESS ||
client_should_resend(aa->aa_resends, aa->aa_cli)) {
- rc = osc_brw_redo_request(req, aa);
+ rc = osc_brw_redo_request(req, aa, rc);
} else {
CERROR("%s: too many resent retries for object: "
""LPU64":"LPU64", rc = %d.\n",
LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
lock_res_and_lock(lock);
- cfs_spin_lock(&osc_ast_guard);
+ spin_lock(&osc_ast_guard);
- if (lock->l_ast_data == NULL)
- lock->l_ast_data = data;
- if (lock->l_ast_data == data)
- set = 1;
+ if (lock->l_ast_data == NULL)
+ lock->l_ast_data = data;
+ if (lock->l_ast_data == data)
+ set = 1;
- cfs_spin_unlock(&osc_ast_guard);
- unlock_res_and_lock(lock);
+ spin_unlock(&osc_ast_guard);
+ unlock_res_and_lock(lock);
- return set;
+ return set;
}
static int osc_set_data_with_check(struct lustre_handle *lockh,
static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
obd_enqueue_update_f upcall, void *cookie,
- int *flags, int agl, int rc)
+ __u64 *flags, int agl, int rc)
{
int intent = *flags & LDLM_FL_HAS_INTENT;
ENTRY;
__u32 mode;
struct ost_lvb *lvb;
__u32 lvb_len;
- int *flags = aa->oa_flags;
+ __u64 *flags = aa->oa_flags;
/* Make a local copy of a lock handle and a mode, because aa->oa_*
* might be freed anytime after lock upcall has been called. */
* is excluded from the cluster -- such scenarious make the life difficult, so
* release locks just after they are obtained. */
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- int *flags, ldlm_policy_data_t *policy,
+ __u64 *flags, ldlm_policy_data_t *policy,
struct ost_lvb *lvb, int kms_valid,
obd_enqueue_update_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo,
* are explained in lov_enqueue() */
}
- /* We already have a lock, and it's referenced */
+ /* We already have a lock, and it's referenced.
+ *
+ * At this point, the cl_lock::cll_state is CLS_QUEUING,
+ * AGL upcall may change it to CLS_HELD directly. */
(*upcall)(cookie, ELDLM_OK);
if (einfo->ei_mode != mode)
*flags &= ~LDLM_FL_BLOCK_GRANTED;
rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
- sizeof(*lvb), lockh, async);
+ sizeof(*lvb), LVB_T_OST, lockh, async);
if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct obd_statfs *msfs;
- __u64 used;
ENTRY;
if (rc == -EBADR)
GOTO(out, rc = -EPROTO);
}
- /* Reinitialize the RDONLY and DEGRADED flags at the client
- * on each statfs, so they don't stay set permanently. */
- cfs_spin_lock(&cli->cl_oscc.oscc_lock);
-
- if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
- else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
- cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
-
- if (unlikely(msfs->os_state & OS_STATE_READONLY))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
- else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
- cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
-
- /* Add a bit of hysteresis so this flag isn't continually flapping,
- * and ensure that new files don't get extremely fragmented due to
- * only a small amount of available space in the filesystem.
- * We want to set the NOSPC flag when there is less than ~0.1% free
- * and clear it when there is at least ~0.2% free space, so:
- * avail < ~0.1% max max = avail + used
- * 1025 * avail < avail + used used = blocks - free
- * 1024 * avail < used
- * 1024 * avail < blocks - free
- * avail < ((blocks - free) >> 10)
- *
- * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
- * lose that amount of space so in those cases we report no space left
- * if their is less than 1 GB left. */
- used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
- if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
- ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
- else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
- (msfs->os_ffree > 64) &&
- (msfs->os_bavail > (used << 1)))) {
- cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
- OSCC_FLAG_NOSPC_BLK);
- }
-
- if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
- (msfs->os_bavail < used)))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
-
- cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
-
*aa->aa_oi->oi_osfs = *msfs;
out:
rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
/*Since the request might also come from lprocfs, so we need
*sync this with client_disconnect_export Bug15684*/
- cfs_down_read(&obd->u.cli.cl_sem);
+ down_read(&obd->u.cli.cl_sem);
if (obd->u.cli.cl_import)
imp = class_import_get(obd->u.cli.cl_import);
- cfs_up_read(&obd->u.cli.cl_sem);
+ up_read(&obd->u.cli.cl_sem);
if (!imp)
RETURN(-ENODEV);
RETURN(-EINVAL);
}
-static int osc_setinfo_mds_connect_import(struct obd_import *imp)
-{
- struct llog_ctxt *ctxt;
- int rc = 0;
- ENTRY;
-
- ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt) {
- rc = llog_initiator_connect(ctxt);
- llog_ctxt_put(ctxt);
- } else {
- /* XXX return an error? skip setting below flags? */
- }
-
- cfs_spin_lock(&imp->imp_lock);
- imp->imp_server_timeout = 1;
- imp->imp_pingable = 1;
- cfs_spin_unlock(&imp->imp_lock);
- CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
-
- RETURN(rc);
-}
-
-static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *aa, int rc)
-{
- ENTRY;
- if (rc != 0)
- RETURN(rc);
-
- RETURN(osc_setinfo_mds_connect_import(req->rq_import));
-}
-
static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
obd_count keylen, void *key, obd_count vallen,
void *val, struct ptlrpc_request_set *set)
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
- if (KEY_IS(KEY_NEXT_ID)) {
- obd_id new_val;
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- if (vallen != sizeof(obd_id))
- RETURN(-ERANGE);
- if (val == NULL)
- RETURN(-EINVAL);
-
- if (vallen != sizeof(obd_id))
- RETURN(-EINVAL);
-
- /* avoid race between allocate new object and set next id
- * from ll_sync thread */
- cfs_spin_lock(&oscc->oscc_lock);
- new_val = *((obd_id*)val) + 1;
- if (new_val > oscc->oscc_next_id)
- oscc->oscc_next_id = new_val;
- cfs_spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
- exp->exp_obd->obd_name,
- obd->u.cli.cl_oscc.oscc_next_id);
-
- RETURN(0);
- }
-
if (KEY_IS(KEY_CHECKSUM)) {
if (vallen != sizeof(int))
RETURN(-EINVAL);
RETURN(0);
}
+ if (KEY_IS(KEY_CACHE_SET)) {
+ struct client_obd *cli = &obd->u.cli;
+
+ LASSERT(cli->cl_cache == NULL); /* only once */
+ cli->cl_cache = (struct cl_client_cache *)val;
+ cfs_atomic_inc(&cli->cl_cache->ccc_users);
+ cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
+
+ /* add this osc into entity list */
+ LASSERT(cfs_list_empty(&cli->cl_lru_osc));
+ spin_lock(&cli->cl_cache->ccc_lru_lock);
+ cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
+ spin_unlock(&cli->cl_cache->ccc_lru_lock);
+
+ RETURN(0);
+ }
+
+ if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
+ struct client_obd *cli = &obd->u.cli;
+ int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
+ int target = *(int *)val;
+
+ nr = osc_lru_shrink(cli, min(nr, target));
+ *(int *)val -= nr;
+ RETURN(0);
+ }
+
if (!set && !KEY_IS(KEY_GRANT_SHRINK))
RETURN(-EINVAL);
Even if something bad goes through, we'd get a -EINVAL from OST
anyway. */
- if (KEY_IS(KEY_GRANT_SHRINK))
- req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
- else
- req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
-
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT, keylen);
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
- RCL_CLIENT, vallen);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
+ req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
+ &RQF_OST_SET_GRANT_INFO :
+ &RQF_OBD_SET_INFO);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT, keylen);
+ if (!KEY_IS(KEY_GRANT_SHRINK))
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT, vallen);
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- memcpy(tmp, key, keylen);
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ memcpy(tmp, key, keylen);
+ tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
+ &RMF_OST_BODY :
+ &RMF_SETINFO_VAL);
memcpy(tmp, val, vallen);
- if (KEY_IS(KEY_MDS_CONN)) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- oscc->oscc_oa.o_seq = (*(__u32 *)val);
- oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
- LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
- req->rq_no_delay = req->rq_no_resend = 1;
- req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
- } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+ if (KEY_IS(KEY_GRANT_SHRINK)) {
struct osc_grant_args *aa;
struct obdo *oa;
}
-static struct llog_operations osc_size_repl_logops = {
- lop_cancel: llog_obd_repl_cancel
-};
-
-static struct llog_operations osc_mds_ost_orig_logops;
-
-static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
- struct obd_device *tgt, struct llog_catid *catid)
-{
- int rc;
- ENTRY;
-
- rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
- &catid->lci_logid, &osc_mds_ost_orig_logops);
- if (rc) {
- CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
- GOTO(out, rc);
- }
-
- rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
- NULL, &osc_size_repl_logops);
- if (rc) {
- struct llog_ctxt *ctxt =
- llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt)
- llog_cleanup(ctxt);
- CERROR("failed LLOG_SIZE_REPL_CTXT\n");
- }
- GOTO(out, rc);
-out:
- if (rc) {
- CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
- obd->obd_name, tgt->obd_name, catid, rc);
- CERROR("logid "LPX64":0x%x\n",
- catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
- }
- return rc;
-}
-
static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
struct obd_device *disk_obd, int *index)
{
- struct llog_catid catid;
- static char name[32] = CATLIST;
- int rc;
- ENTRY;
-
- LASSERT(olg == &obd->obd_olg);
-
- cfs_mutex_lock(&olg->olg_cat_processing);
- rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
- obd->obd_name, *index, catid.lci_logid.lgl_oid,
- catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
-
- rc = __osc_llog_init(obd, olg, disk_obd, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- out:
- cfs_mutex_unlock(&olg->olg_cat_processing);
-
- return rc;
+ /* this code is not supposed to be used with LOD/OSP
+ * to be removed soon */
+ LBUG();
+ return 0;
}
static int osc_llog_finish(struct obd_device *obd, int count)
{
- struct llog_ctxt *ctxt;
- int rc = 0, rc2 = 0;
- ENTRY;
+ struct llog_ctxt *ctxt;
- ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt)
- rc = llog_cleanup(ctxt);
+ ENTRY;
- ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
- if (ctxt)
- rc2 = llog_cleanup(ctxt);
- if (!rc)
- rc = rc2;
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ if (ctxt) {
+ llog_cat_close(NULL, ctxt->loc_handle);
+ llog_cleanup(NULL, ctxt);
+ }
- RETURN(rc);
+ ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ if (ctxt)
+ llog_cleanup(NULL, ctxt);
+ RETURN(0);
}
static int osc_reconnect(const struct lu_env *env,
switch (event) {
case IMP_EVENT_DISCON: {
- /* Only do this on the MDS OSC's */
- if (imp->imp_server_timeout) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- cfs_spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- cfs_spin_unlock(&oscc->oscc_lock);
- }
cli = &obd->u.cli;
client_obd_list_lock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = 0;
break;
}
case IMP_EVENT_ACTIVE: {
- /* Only do this on the MDS OSC's */
- if (imp->imp_server_timeout) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- cfs_spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
- OSCC_FLAG_NOSPC_BLK);
- cfs_spin_unlock(&oscc->oscc_lock);
- }
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
break;
}
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
- struct client_obd *cli = &obd->u.cli;
- int rc;
- ENTRY;
-
- ENTRY;
- rc = ptlrpcd_addref();
- if (rc)
- RETURN(rc);
-
- rc = client_obd_setup(obd, lcfg);
- if (rc == 0) {
- void *handler;
- handler = ptlrpcd_alloc_work(cli->cl_import,
- brw_queue_work, cli);
- if (!IS_ERR(handler))
- cli->cl_writeback_work = handler;
- else
- rc = PTR_ERR(handler);
- }
-
- if (rc == 0) {
- struct lprocfs_static_vars lvars = { 0 };
-
- cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
- lprocfs_osc_init_vars(&lvars);
- if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
- lproc_osc_attach_seqstat(obd);
- sptlrpc_lprocfs_cliobd_attach(obd);
- ptlrpc_lprocfs_register_obd(obd);
- }
-
- oscc_init(obd);
- /* We need to allocate a few requests more, because
- brw_interpret tries to create new requests before freeing
- previous ones. Ideally we want to have 2x max_rpcs_in_flight
- reserved, but I afraid that might be too much wasted RAM
- in fact, so 2 is just my guess and still should work. */
- cli->cl_import->imp_rq_pool =
- ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
- OST_MAXREQSIZE,
- ptlrpc_add_rqs_to_pool);
+ struct lprocfs_static_vars lvars = { 0 };
+ struct client_obd *cli = &obd->u.cli;
+ void *handler;
+ int rc;
+ ENTRY;
- CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+ rc = ptlrpcd_addref();
+ if (rc)
+ RETURN(rc);
+
+ rc = client_obd_setup(obd, lcfg);
+ if (rc)
+ GOTO(out_ptlrpcd, rc);
+
+ handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
+ if (IS_ERR(handler))
+ GOTO(out_client_setup, rc = PTR_ERR(handler));
+ cli->cl_writeback_work = handler;
+
+ rc = osc_quota_setup(obd);
+ if (rc)
+ GOTO(out_ptlrpcd_work, rc);
+
+ cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+ lprocfs_osc_init_vars(&lvars);
+ if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
+ lproc_osc_attach_seqstat(obd);
+ sptlrpc_lprocfs_cliobd_attach(obd);
+ ptlrpc_lprocfs_register_obd(obd);
+ }
- ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
- }
+ /* We need to allocate a few requests more, because
+ * brw_interpret tries to create new requests before freeing
+ * previous ones, Ideally we want to have 2x max_rpcs_in_flight
+ * reserved, but I'm afraid that might be too much wasted RAM
+ * in fact, so 2 is just my guess and still should work. */
+ cli->cl_import->imp_rq_pool =
+ ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
+ OST_MAXREQSIZE,
+ ptlrpc_add_rqs_to_pool);
+
+ CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+ ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
+ RETURN(rc);
- if (rc)
- ptlrpcd_decref();
- RETURN(rc);
+out_ptlrpcd_work:
+ ptlrpcd_destroy_work(handler);
+out_client_setup:
+ client_obd_cleanup(obd);
+out_ptlrpcd:
+ ptlrpcd_decref();
+ RETURN(rc);
}
static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
ptlrpc_deactivate_import(imp);
- cfs_spin_lock(&imp->imp_lock);
- imp->imp_pingable = 0;
- cfs_spin_unlock(&imp->imp_lock);
+ spin_lock(&imp->imp_lock);
+ imp->imp_pingable = 0;
+ spin_unlock(&imp->imp_lock);
break;
}
case OBD_CLEANUP_EXPORTS: {
int osc_cleanup(struct obd_device *obd)
{
- int rc;
+ struct client_obd *cli = &obd->u.cli;
+ int rc;
- ENTRY;
+ ENTRY;
+
+ /* lru cleanup */
+ if (cli->cl_cache != NULL) {
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
+ spin_lock(&cli->cl_cache->ccc_lru_lock);
+ cfs_list_del_init(&cli->cl_lru_osc);
+ spin_unlock(&cli->cl_cache->ccc_lru_lock);
+ cli->cl_lru_left = NULL;
+ cfs_atomic_dec(&cli->cl_cache->ccc_users);
+ cli->cl_cache = NULL;
+ }
/* free memory of osc quota cache */
osc_quota_cleanup(obd);
.o_statfs_async = osc_statfs_async,
.o_packmd = osc_packmd,
.o_unpackmd = osc_unpackmd,
- .o_precreate = osc_precreate,
.o_create = osc_create,
- .o_create_async = osc_create_async,
.o_destroy = osc_destroy,
.o_getattr = osc_getattr,
.o_getattr_async = osc_getattr_async,
.o_process_config = osc_process_config,
.o_quotactl = osc_quotactl,
.o_quotacheck = osc_quotacheck,
- .o_quota_adjust_qunit = osc_quota_adjust_qunit,
};
extern struct lu_kmem_descr osc_caches[];
-extern cfs_spinlock_t osc_ast_guard;
-extern cfs_lock_class_key_t osc_ast_guard_class;
+extern spinlock_t osc_ast_guard;
+extern struct lock_class_key osc_ast_guard_class;
int __init osc_init(void)
{
lprocfs_osc_init_vars(&lvars);
- osc_quota_init();
rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
LUSTRE_OSC_NAME, &osc_device_type);
if (rc) {
RETURN(rc);
}
- cfs_spin_lock_init(&osc_ast_guard);
- cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
-
- osc_mds_ost_orig_logops = llog_lvfs_ops;
- osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
- osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
- osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
- osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
+ spin_lock_init(&osc_ast_guard);
+ lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
- RETURN(rc);
+ RETURN(rc);
}
#ifdef __KERNEL__
static void /*__exit*/ osc_exit(void)
{
- osc_quota_exit();
class_unregister_type(LUSTRE_OSC_NAME);
lu_kmem_fini(osc_caches);
}