HBase一: c#访问hbase组件开发

      HDP2.4安装系列介绍了通过ambari创建hbase集群的过程,但工作中一直采用.net的技术路线,如何去访问基于Java搞的Hbase呢? Hbase提供基于Java的本地API访问,同时扩展了通过 ThriftRest 实现Web访问的API so 决定开发基于.net的 sdk,通过其提供的 rest webAPI 来访问 hbase, 其中c#与Java的数据交互过程采用protobuf协议。

目录:

参考资料
基本原理
c#、java数据交互
hbase filter 实体
WebRequester
hbaseClient

参考资料:

hbase官网 (http://hbase.apache.org/book.html
hbase 源码  github下载)
Microsoft.HBase.Client 源码 github下载)

基本原理:

HBase Rest 是建立在HBase java 客户端基础之上提供的web 服务,示意图如下:

可以通过 start /stop 等命令来启动或停止Hbase的 Rest server 服务,如下:

命令:hbase rest start   默认的方式启动rest服务,端口是8080)
命令:hbase rest start 9000 (这种方式以端口9000方式启动)
命令:hbase-daemon.sh start rest -p 9000

当服务启动的时候,系统内嵌的jetty servlet container启动并部署servlet.服务默认的监听端口为8080,可通过修改hbase 配置文件来替换其它端口。
简单概述需求:将下面列表中的访问及传递参数用c#进行封装

http://192.168.2.21: 为HBase master 对应的IP地址
8080: 是HBase Rest Server对应的端口
yourTable: 操作HBase 数据库的表名
schema/regions/scanner: 约定关键字

c#与java通过protobuf数据交互:

Hbase 为java与其它开发语言通过protobuf进行数据交互制定一个特定的数据结构(见hbase官网REST Protobufs Schema 的结构描述),网上有一堆的工具可根据据protobufs schemal 文件生成java、c#源码。意思是双方都遵守这个数据文件格式,来实现夸平台的数据交互与共享。这个就是做了一个平台无关的文件与平台和语言相关的数据对象之间的适配转化工作,如很多xml解析器一样的原理。
协议文件是.proto为后缀的文件,格式如下代码示例

package org.apache.hadoop.hbase.rest.protobuf.generated;

message TableInfo {
  required string name = 1;
  message Region {
    required string name = 1;
    optional bytes startKey = 2;
    optional bytes endKey = 3;
    optional int64 id = 4;
    optional string location = 5;
  }
  repeated Region regions = 2;
}

package:在Java里面代表这个文件所在的包名,在c#里面代表该文件的命名空间
message:代表一个类;
required: 代表该字段必填;
optional: 代表该字段可选,并可以为其设置默认值

从github上下载window版的转换工具,将解压后包中的ProtoGen.exe.config,protoc.exe,ProtoGen.exeGoogle.ProtocolBuffers.dll文件放到某个新建的文件夹 如:c:zhu)
将hbase 规定的协议文件同时copy至该目录 (hbase源码包中 hbasehbase-restsrcmainesourcesorgapachehadoophbaseestprotobuf  下的文件)
以TableInfoMessage.proto 为例进行说明, windows系统下打开命令提示符,切换至 c:zhu 文件夹下
执行:protoc –descriptor_set_out=TableInfoMessage.protobin –include_imports TableInfoMessage.proto
上述命令之后,c:zhu 文件夹内生成了一个TableInfoMessage.protobin文件
执行:protogen AddressBook.protobin  目录下会生成名为TableInfoMessage.cs文件,这就是生成的c#源码)
当然你可以写一个批处理命令来执行,完成后生成的9个文件引入到你的Visual studio 工程即可使用。

hbase filter 实体:

在hbase读取数据时设置的过滤参数,参照 hbasehbase-clientsrcmainjavaorgapachehadoophbasefilter)源码,用c#翻译一次
完成后如下图
   

WebRequester

  封装http请求 WebRequester 类

public class WebRequester 
    {
        private string url = string.Empty;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="urlString"></param>
        public WebRequesterstring urlString)
        {
            this.url = urlString;
        }

        /// <summary>
        /// Issues the web request.
        /// </summary>
        /// <param name="endpoint">The endpoint.</param>
        /// <param name="method">The method.</param>
        /// <param name="input">The input.</param>
        /// <param name="options">request options</param>
        /// <returns></returns>
        public HttpWebResponse IssueWebRequeststring endpoint, string method, Stream input, RequestOptions options)
        {
            return IssueWebRequestAsyncendpoint, method, input,options).Result;
        }

        /// <summary>
        /// Issues the web request asynchronous.
        /// </summary>
        /// <param name="endpoint">The endpoint.</param>
        /// <param name="method">The method.</param>
        /// <param name="input">The input.</param>
        /// <param name="options">request options</param>
        /// <returns></returns>
        public async Task<HttpWebResponse> IssueWebRequestAsyncstring endpoint, string method, Stream input, RequestOptions options)
        {
            string uri = string.Format"{0}/{1}", this.url, endpoint);
            HttpWebRequest httpWebRequest = HttpWebRequest.CreateHttpuri);
            httpWebRequest.Timeout = options.TimeoutMillis;
            httpWebRequest.PreAuthenticate = true;
            httpWebRequest.Method = method;
            httpWebRequest.ContentType = options.ContentType;

            if options.AdditionalHeaders != null)
            {
                foreach var kv in options.AdditionalHeaders)
                {
                    httpWebRequest.Headers.Addkv.Key, kv.Value);
                }
            }

            if input != null)
            {
                using Stream req = await httpWebRequest.GetRequestStreamAsync))
                {
                    await input.CopyToAsyncreq);
                }
            }

            return await httpWebRequest.GetResponseAsync)) as HttpWebResponse;
        }
    }

View Code

http 操作实体类

public class RequestOptions
    {       
        public string AlternativeEndpoint { get; set; }
        public bool KeepAlive { get; set; }
        public int TimeoutMillis { get; set; }
        public int SerializationBufferSize { get; set; }
        public int ReceiveBufferSize { get; set; }
        public bool UseNagle { get; set; }
        public int Port { get; set; }
        public Dictionary<string, string> AdditionalHeaders { get; set; }
        public string AlternativeHost { get; set; }
        public string ContentType { get; set; }

        public static RequestOptions GetDefaultOptions)
        {
            return new RequestOptions)
            {
                KeepAlive = true,
                TimeoutMillis = 30000,
                ReceiveBufferSize = 1024 * 1024 * 1,
                SerializationBufferSize = 1024 * 1024 * 1,
                UseNagle = false,
                //AlternativeEndpoint = Constants.RestEndpointBase,
                //Port = 443,
                AlternativeEndpoint = string.Empty,
                Port = 8080,
                AlternativeHost = null,
                ContentType = "application/x-protobuf"
            };
        }

    }

View Code

hbaseClient

定义hbase 常用操作接口IHbaseClient(包含基于表的操作以及数据的读写),示例如下

public interface IHBaseClient
    {        

        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<org.apache.hadoop.hbase.rest.protobuf.generated.Version> GetVersionAsyncRequestOptions options = null);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="schema"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<bool> CreateTableAsyncTableSchema schema, RequestOptions options = null);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task DeleteTableAsyncstring table, RequestOptions options = null);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<TableInfo> GetTableInfoAsyncstring table, RequestOptions options = null);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<TableSchema> GetTableSchemaAsyncstring table, RequestOptions options = null);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<TableList> ListTablesAsyncRequestOptions options = null);


        /// <summary>
        /// 
        /// </summary>
        /// <param name="tableName"></param>
        /// <param name="scannerSettings"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<ScannerInformation> CreateScannerAsyncstring tableName, Scanner scannerSettings, RequestOptions options);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="scannerInfo"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<CellSet> ScannerGetNextAsyncScannerInformation scannerInfo, RequestOptions options);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="cells"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        Task<bool> StoreCellsAsyncstring table, CellSet cells, RequestOptions options = null);
    }

View Code

实现接口类 HBaseClient

public class HBaseClient : IHBaseClient
    {
        private WebRequester _requester;

        private readonly RequestOptions _globalRequestOptions;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="endPoints"></param>
        /// <param name="globalRequestOptions"></param>
        public HBaseClientstring url, RequestOptions globalRequestOptions = null)
        {
            _globalRequestOptions = globalRequestOptions ?? RequestOptions.GetDefaultOptions);
            _requester = new WebRequesterurl);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<org.apache.hadoop.hbase.rest.protobuf.generated.Version> GetVersionAsyncRequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            return await GetRequestAndDeserializeAsync<org.apache.hadoop.hbase.rest.protobuf.generated.Version>EndPointType.Version, optionToUse);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="schema"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<bool> CreateTableAsyncTableSchema schema, RequestOptions options = null)
        {
            if string.IsNullOrEmptyschema.name))
                throw new ArgumentException"schema.name was either null or empty!", "schema");

            var optionToUse = options ?? _globalRequestOptions;
            string endpoint = string.Format"{0}/{1}", schema.name, EndPointType.Schema);
            using HttpWebResponse webResponse = await PutRequestAsyncendpoint,schema, optionToUse))
            {
                if webResponse.StatusCode == HttpStatusCode.Created)
                {
                    return true;
                }

                // table already exits
                if webResponse.StatusCode == HttpStatusCode.OK)
                {
                    return false;
                }

                // throw the exception otherwise
                using var output = new StreamReaderwebResponse.GetResponseStream)))
                {
                    string message = output.ReadToEnd);
                    throw new WebException
                       string.Format"Couldn't create table {0}! Response code was: {1}, expected either 200 or 201! Response body was: {2}",
                          schema.name,webResponse.StatusCode,message));
                }
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task DeleteTableAsyncstring table, RequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            string endPoint = string.Format"{0}/{1}", table, EndPointType.Schema);

            using HttpWebResponse webResponse = await ExecuteMethodAsync<HttpWebResponse>WebMethod.Delete, endPoint, null, optionToUse))
            {
                if webResponse.StatusCode != HttpStatusCode.OK)
                {
                    using var output = new StreamReaderwebResponse.GetResponseStream)))
                    {
                        string message = output.ReadToEnd);
                        throw new WebException
                            string.Format"Couldn't delete table {0}! Response code was: {1}, expected 200! Response body was: {2}",
                                table, webResponse.StatusCode, message));
                    }
                }
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<TableInfo> GetTableInfoAsyncstring table, RequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            string endPoint = string.Format"{0}/{1}", table, EndPointType.Regions);
            return await GetRequestAndDeserializeAsync<TableInfo>endPoint, optionToUse);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<TableSchema> GetTableSchemaAsyncstring table, RequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            string endPoint = string.Format"{0}/{1}", table, EndPointType.Schema);
            return await GetRequestAndDeserializeAsync<TableSchema>endPoint, optionToUse);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<TableList> ListTablesAsyncRequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            return await GetRequestAndDeserializeAsync<TableList>"", optionToUse);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="tableName"></param>
        /// <param name="scannerSettings"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<ScannerInformation> CreateScannerAsyncstring tableName, Scanner scannerSettings, RequestOptions options)
        {
            string endPoint = string.Format"{0}/{1}", tableName, EndPointType.Scanner);

            using HttpWebResponse response = await ExecuteMethodAsyncWebMethod.Post, endPoint, scannerSettings, options))
            {
                if response.StatusCode != HttpStatusCode.Created)
                {
                    using var output = new StreamReaderresponse.GetResponseStream)))
                    {
                        string message = output.ReadToEnd);
                        throw new WebException
                            string.Format "Couldn't create a scanner for table {0}! Response code was: {1}, expected 201! Response body was: {2}",
                                tableName, response.StatusCode, message));
                    }
                }
                string location = response.Headers.Get"Location");
                if location == null)
                {
                    throw new ArgumentException"Couldn't find header 'Location' in the response!");
                }

                return new ScannerInformationnew Urilocation), tableName, response.Headers);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="scannerInfo"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<CellSet> ScannerGetNextAsyncScannerInformation scannerInfo, RequestOptions options)
        {
            string endPoint = string.Format"{0}/{1}/{2}", scannerInfo.TableName, EndPointType.Scanner, scannerInfo.ScannerId);
            using HttpWebResponse webResponse = await GetRequestAsyncendPoint, options))
            {
                if webResponse.StatusCode == HttpStatusCode.OK)
                {
                    return Serializer.Deserialize<CellSet>webResponse.GetResponseStream));
                }

                return null;
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="table"></param>
        /// <param name="cells"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        public async Task<bool> StoreCellsAsyncstring table, CellSet cells, RequestOptions options = null)
        {
            var optionToUse = options ?? _globalRequestOptions;
            string path = table + "/somefalsekey";
            using HttpWebResponse webResponse = await PutRequestAsyncpath, cells, options))
            {
                if webResponse.StatusCode == HttpStatusCode.NotModified)
                {
                    return false;
                }

                if webResponse.StatusCode != HttpStatusCode.OK)
                {
                    using var output = new StreamReaderwebResponse.GetResponseStream)))
                    {
                        string message = output.ReadToEnd);
                        throw new WebException
                           string.Format"Couldn't insert into table {0}! Response code was: {1}, expected 200! Response body was: {2}",
                              table, webResponse.StatusCode, message));
                    }
                }
            }
            return true;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="endpoint"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private async Task<T> GetRequestAndDeserializeAsync<T>string endpoint, RequestOptions options)
        {            
            using WebResponse response = await _requester.IssueWebRequestAsyncendpoint, WebMethod.Get, null, options))
            {
                using Stream responseStream = response.GetResponseStream))
                {
                    return Serializer.Deserialize<T>responseStream);
                }
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TReq"></typeparam>
        /// <param name="endpoint"></param>
        /// <param name="query"></param>
        /// <param name="request"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private async Task<HttpWebResponse> PutRequestAsync<TReq>string endpoint, TReq request, RequestOptions options)
          where TReq : class
        {
            return await ExecuteMethodAsyncWebMethod.Post, endpoint, request, options);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TReq"></typeparam>
        /// <param name="method"></param>
        /// <param name="endpoint"></param>
        /// <param name="request"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private async Task<HttpWebResponse> ExecuteMethodAsync<TReq>string method,string endpoint,TReq request,RequestOptions options) where TReq : class
        {
            using var input = new MemoryStreamoptions.SerializationBufferSize))
            {
                if request != null)
                {
                    Serializer.Serializeinput, request);
                }
                input.Seek0, SeekOrigin.Begin);
                return await _requester.IssueWebRequestAsyncendpoint,method, input, options);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="endpoint"></param>
        /// <param name="query"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private async Task<HttpWebResponse> GetRequestAsyncstring endpoint, RequestOptions options)
        {
            return await _requester.IssueWebRequestAsyncendpoint, WebMethod.Get, null, options);
        }
    }

View Code

按步骤完成上面的代码,编译通过即OK,下一篇进入sdk的测试验证之旅

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注