运维开发网

连接Couchbase

运维开发网 https://www.qedev.com 2020-05-18 15:49 出处:网络
web.config中增加 <configSections> <sectionGroup name="couchbaseClients"> <section name="couchbase" type="Couchbase.Configuration.Client.Providers.CouchbaseClientSection, Couchbase.NetClient" /> </section

web.config中增加

<configSections>

<sectionGroup name="couchbaseClients">

<section name="couchbase" type="Couchbase.Configuration.Client.Providers.CouchbaseClientSection, Couchbase.NetClient" />

</sectionGroup>

</configSections>

<couchbaseClients>

<couchbase useSsl="false">

<servers>

<add uri="http://192.168.10.80:8091/pools" />

<add uri="http://192.168.10.81:8091/pools" />

</servers>

<buckets>

<add name="GuotaiTrading" useSsl="false" password="">

<connectionPool name="custom" maxSize="20" minSize="1">

</connectionPool>

</add>

</buckets>

</couchbase>

</couchbaseClients>

 

增加CbManager类:

public class CbManager

{

private static ICluster cluster { get; set; }

private static IBucket tradeBucket { get; set; }

private static IBucket sessionBucket { get; set; }

/// <summary>

/// 初始化Couchbase;

/// Web.Config配置AppSettings["CouchBaseSection"],如果没有配置,默认"couchbaseClients/couchbase";

/// tradeBucketName默认为"GuotaiTrading";

/// sessionBucketName默认为"gtSessions";

/// </summary>

public static void Init()

{

string strSection = ConfigurationManager.AppSettings["CouchBaseSection"];

strSection = strSection == null ? "couchbaseClients/couchbase" : strSection;

string tradeBucketName = "GuotaiTrading";

string sessionBucketName = "gtSessions";

if (!string.IsNullOrWhiteSpace(strSection))

{

cluster = new Cluster(strSection);

if (cluster.Configuration.BucketConfigs.ContainsKey(tradeBucketName))

{

var strPassword = cluster.Configuration.BucketConfigs[tradeBucketName].Password;

if (!string.IsNullOrWhiteSpace(strPassword))

{

tradeBucket = cluster.OpenBucket(tradeBucketName, strPassword);

}

else

{

tradeBucket = cluster.OpenBucket(tradeBucketName);

}

}

if (cluster.Configuration.BucketConfigs.ContainsKey(sessionBucketName))

{

var strPassword = cluster.Configuration.BucketConfigs[sessionBucketName].Password;

if (!string.IsNullOrWhiteSpace(strPassword))

{

sessionBucket = cluster.OpenBucket(sessionBucketName, strPassword);

}

else

{

sessionBucket = cluster.OpenBucket(sessionBucketName);

}

}

}

}

public static void ShutDown()

{

if (tradeBucket != null)

{

cluster.CloseBucket(tradeBucket);

}

if (sessionBucket != null)

{

cluster.CloseBucket(sessionBucket);

}

}

#region 类内调用的私有方法

private static async Task<T> BaseGetAsync<T>(IBucket bucket, string id) where T : class

{

var queryResult = await bucket.GetAsync<T>(id);

if (queryResult != null && queryResult.Success)

{

return queryResult.Value;

}

return null;

}

private static T BaseGet<T>(IBucket bucket, string id) where T : class

{

var queryResult = bucket.Get<T>(id);

if (queryResult != null && queryResult.Success)

{

return queryResult.Value;

}

return null;

}

private static async Task<bool> BaseUpsertAsync<T>(IBucket bucket, string key, T value, ulong? cas = null, TimeSpan? expires = null)

{

try

{

IOperationResult<T> result;

if (cas != null && expires == null)

{

result = await bucket.UpsertAsync(key, value, cas.Value);

}

else if (expires != null && cas == null)

{

result = await bucket.UpsertAsync(key, value, expires.Value);

}

else if (expires != null && cas != null)

{

result = await bucket.UpsertAsync(key, value, cas.Value, expires.Value);

}

else

{

result = await bucket.UpsertAsync(key, value);

}

return result.Success;

}

catch (Exception ex)

{

Logger.LogError("UpsertAsync Failure ,ex:" + ex.ToString());

return false;

}

}

private static bool BaseUpsert<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)

{

try

{

var result = expires == null ? bucket.Upsert(key, value) : bucket.Upsert(key, value, expires.Value);

return result.Success;

}

catch (Exception ex)

{

Logger.LogError("Upsert Failure ,ex:" + ex.ToString());

return false;

}

}

private static async Task<bool> BaseInsertAsync<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)

{

try

{

var result = expires == null ? await bucket.InsertAsync(key, value) : await bucket.InsertAsync(key, value, expires.Value);

return result.Success;

}

catch (Exception ex)

{

Logger.LogError("InsertAsync Failure ,ex:" + ex.ToString());

return false;

}

}

private static bool BaseDelete(IBucket bucket, string key)

{

try

{

var result = bucket.Remove(key);

return result.Success;

}

catch (Exception ex)

{

Logger.LogError("Delete Failure ,ex:" + ex.ToString());

return false;

}

}

private static bool BaseTouch(IBucket bucket, string id, TimeSpan expires)

{

try

{

var result = bucket.Touch(id, expires);

return result.Success;

}

catch (Exception ex)

{

Logger.LogError("BaseTouch Failure ,ex:" + ex.ToString());

return false;

}

}

#endregion

#region tradeBucket增删改查

#region 生成TxId

/// <summary>

/// 获取唯一交易序号

/// </summary>

/// <returns></returns>

public async static Task<ulong> GetTxId()

{

var result = await tradeBucket.IncrementAsync("MT4_TransactionCounter", 1, 1);

return result.Value;

}

#endregion

private static Couchbase.Views.IViewQuery UnionQuery(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false, int? groupLevel = null)

{

var view = tradeBucket.CreateQuery(designDoc, viewName);

view = view.Reduce(reduceFlag);

if (sortType == SortType.desc)

{

if (endKey != null)

view = view.StartKey(endKey);

if (startKey != null)

view = view.EndKey(startKey);

view.Desc();

}

else

{

if (startKey != null)

view = view.StartKey(startKey);

if (endKey != null)

view = view.EndKey(endKey);

}

if (forceUpdate)

view = view.Stale(Couchbase.Views.StaleState.False);

if (pageLimit > 0)

{

view = view.Skip((page - 1) * pageLimit).Limit(pageLimit);

}

if (reduceFlag && groupLevel!=null)

view = view.GroupLevel(groupLevel.Value);

return view;

}

/// <summary>

/// 异步查询View

/// </summary>

/// <typeparam name="T"></typeparam>

/// <param name="designDoc"></param>

/// <param name="viewname"></param>

/// <param name="StartKey"></param>

/// <param name="EndKey"></param>

/// <param name="page"></param>

/// <param name="pageLimit"></param>

/// <param name="forceUpdate">是否强制刷新</param>

/// <returns></returns>

public static async Task<List<T>> QueryAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);

var query = await tradeBucket.QueryAsync<object>(view);

List<T> resultList = new List<T>();

if (query.Success && query.Values != null)

{

foreach (var row in query.Rows)

{

var result = await tradeBucket.GetAsync<T>(row.Id);

if (result.Success)

resultList.Add(result.Value);

}

}

return resultList;

}

public static List<T> Query<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);

var query = tradeBucket.Query<object>(view);

List<T> resultList = new List<T>();

if (query.Success && query.Values != null)

{

foreach (var row in query.Rows)

{

var result = tradeBucket.Get<T>(row.Id);

if (result.Success)

resultList.Add(result.Value);

}

}

return resultList;

}

public static async Task<ConcurrentBag<T>> QueryParallelAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);

var query = await tradeBucket.QueryAsync<object>(view);

ConcurrentBag<T> resultList = new ConcurrentBag<T>();

if (query.Success && query.Values != null)

{

ParallelOptions options = new ParallelOptions();

options.MaxDegreeOfParallelism = 4;

Parallel.ForEach(query.Rows, options, (item) =>

{

var result = tradeBucket.Get<T>(item.Id);

if (result.Success)

resultList.Add(result.Value);

});

}

return resultList;

}

/// <summary>

/// 异步查询返回数量

/// </summary>

/// <param name="designDoc"></param>

/// <param name="viewName"></param>

/// <param name="startKey"></param>

/// <param name="endKey"></param>

/// <param name="forceUpdate"></param>

/// <param name="reduceFlag"></param>

/// <returns></returns>

public static async Task<int> QueryCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);

var query = await tradeBucket.QueryAsync<object>(view);

return query.Rows.Count();

}

public static int QueryCount(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);

var query = tradeBucket.Query<object>(view);

return query.Rows.Count();

}

/// <summary>

/// 异步统计记录数count (View返回的是count值)

/// </summary>

/// <param name="designDoc"></param>

/// <param name="viewName"></param>

/// <param name="startKey"></param>

/// <param name="endKey"></param>

/// <param name="forceUpdate"></param>

/// <param name="reduceFlag"></param>

/// <param name="groupLevel"></param>

/// <returns></returns>

public static async Task<int> StatisticCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);

var query = await tradeBucket.QueryAsync<object>(view);

int count = 0;

if (query.Success && query.Values != null && query.Values.Count() > 0)

{

count = int.Parse(query.Values.FirstOrDefault().ToString());

}

return count;

}

/// <summary>

/// 异步查询统计的count (View返回的是JSON数组,包含count、sum、min、max、sumsqr)

/// </summary>

/// <param name="designDoc"></param>

/// <param name="viewName"></param>

/// <param name="startKey"></param>

/// <param name="endKey"></param>

/// <param name="forceUpdate"></param>

/// <param name="reduceFlag"></param>

/// <param name="groupLevel"></param>

/// <returns></returns>

public static async Task<int> QueryStatsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);

var query = await tradeBucket.QueryAsync<JObject>(view);

JToken t = null;

int count = 0;

if (query.Success && query.Values != null && query.Values.Count() > 0)

{

query.Values.FirstOrDefault().TryGetValue("count", out t);

count = int.Parse(t.ToString());

}

return count;

}

/// <summary>

/// 异步查询统计的sum (View返回的是JSON数组,包含count、sum、min、max、sumsqr)

/// </summary>

/// <param name="designDoc"></param>

/// <param name="viewName"></param>

/// <param name="startKey"></param>

/// <param name="endKey"></param>

/// <param name="forceUpdate"></param>

/// <param name="reduceFlag"></param>

/// <param name="groupLevel"></param>

/// <returns></returns>

public static async Task<decimal> QueryStatsSumAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);

var query = await tradeBucket.QueryAsync<JObject>(view);

JToken t = null;

decimal sum = 0;

if (query.Success && query.Values != null && query.Values.Count() > 0)

{

query.Values.FirstOrDefault().TryGetValue("sum", out t);

sum = decimal.Parse(t.ToString());

}

return sum;

}

/// <summary>

/// 异步查询返回ViewRow

/// </summary>

/// <param name="designDoc"></param>

/// <param name="viewName"></param>

/// <param name="startKey"></param>

/// <param name="endKey"></param>

/// <param name="page"></param>

/// <param name="pageLimit"></param>

/// <param name="sortType"></param>

/// <param name="forceUpdate"></param>

/// <param name="reduceFlag"></param>

/// <returns></returns>

public static async Task<IEnumerable<Couchbase.Views.ViewRow<object>>> QueryRowsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false)

{

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);

var query = await tradeBucket.QueryAsync<object>(view);

if (query.Success && query.Values != null)

{

return query.Rows;

}

return null;

}

/// <summary>

/// 根据ID在CouchBase中查询对象,如果查询成功且结果不为空则返回结果,否则返回null

/// </summary>

/// <typeparam name="T"></typeparam>

/// <param name="id"></param>

/// <returns></returns>

public static async Task<T> GetAsync<T>(string id) where T : class

{

return await BaseGetAsync<T>(tradeBucket, id);

}

public static T Get<T>(string id) where T : class

{

return BaseGet<T>(tradeBucket, id);

}

public static async Task<GetCasModel<T>> GetCasAsync<T>(string id) where T : class

{

var queryResult = await tradeBucket.GetAsync<T>(id);

if (queryResult != null && queryResult.Success && queryResult.Value != null)

{

return new GetCasModel<T> { Value = queryResult.Value, Cas = queryResult.Cas };

}

return new GetCasModel<T>();

}

public static async Task<bool> UpsertCasAsync<T>(string key, T value, ulong cas, TimeSpan? expires = null)

{

return await BaseUpsertAsync<T>(tradeBucket, key, value, cas, expires);

}

public static async Task<bool> UpsertAsync<T>(string key, T value, TimeSpan? expires = null)

{

return await BaseUpsertAsync<T>(tradeBucket, key, value, null, expires);

}

public static bool Upsert<T>(string key, T value, TimeSpan? expires = null)

{

return BaseUpsert<T>(tradeBucket, key, value, expires);

}

public static async Task<bool> InsertAsync<T>(string key, T value, TimeSpan? expires = null)

{

return await BaseInsertAsync<T>(tradeBucket, key, value, expires);

}

public static bool Delete(string key)

{

return BaseDelete(tradeBucket, key);

}

public static bool Touch(string id, TimeSpan expires)

{

return BaseTouch(tradeBucket, id, expires);

}

public static bool Exists(string id)

{

return tradeBucket.Exists(id);

}

public static async Task<bool> ExistsAsync(string id)

{

return await tradeBucket.ExistsAsync(id);

}

#endregion

#region sessionBucket增删

public static async Task<T> SessionGetAsync<T>(string id) where T : class

{

return await BaseGetAsync<T>(sessionBucket, id);

}

public static T SessionGet<T>(string id) where T : class

{

return BaseGet<T>(sessionBucket, id);

}

public static bool SessionUpsert<T>(string key, T value, TimeSpan expires)

{

return BaseUpsert<T>(sessionBucket, key, value, expires);

}

public async static Task<bool> SessionInsert<T>(string key, T value, TimeSpan expires)

{

return await BaseInsertAsync<T>(sessionBucket, key, value, expires);

}

public static bool SessionDelete(string key)

{

return BaseDelete(sessionBucket, key);

}

public static bool SessionTouch(string id, TimeSpan expires)

{

return BaseTouch(sessionBucket, id, expires);

}

public static bool SessionExists(string id)

{

return sessionBucket.Exists(id);

}

#endregion

}

 

初始化Couchbase:

CbManager.Init();

0

精彩评论

暂无评论...
验证码 换一张
取 消