CREATE SOURCE
语法说明
CREATE SOURCE 创建一个对流式数据的连接,并将一个新的 SOURCE 表添加到当前数据库中。
语法结构
CREATE [OR REPLACE] SOURCE [IF NOT EXISTS] stream_name 
( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
WITH ( property_name = expression [, ...]);
语法解释
- stream_name: SOURCE 名称。SOURCE 名称必须与当前数据库中任何现有的 SOURCE 名称不同。
 - column_name: 流式数据映射到 SOURCE 表中的列名。
 - data_type: column_name 对应字段在数据表中的类型。
 - property_name = expression: 有关流式数据映射的具体配置项名以及对应的值,可配置项如下:
 
| property_name | expression 描述 | 
|---|---|
| "type" | 仅支持'kafka':目前仅支持接受的源为 kafka | 
| "topic" | kafka 数据源中对应的 topic | 
| "partion" | kafka 数据源中对应的 partion | 
| "value" | 仅支持'json': 目前仅支持接受的数据格式为 json | 
| "bootstrap.servers" | kafka 服务器对应的 IP:PORT | 
| "sasl.username" | 指定连接到 Kafka 时使用的 SASL(Simple Authentication and Security Layer)用户名 | 
| "sasl.password" | 与 sasl.username 配对使用,这个参数提供了相应的密码 | 
| "sasl.mechanisms" | 客户端和服务器之间认证的 SASL 机制 | 
| "security.protocol" | 指定了与 Kafka 服务器通信时使用的安全协议 | 
示例
create source stream_test(c1 char(25),c2 varchar(500),c3 text,c4 tinytext,c5 mediumtext,c6 longtext )with(
    "type"='kafka',
    "topic"= 'test',
    "partition" = '0',
    "value"= 'json',
    "bootstrap.servers"='127.0.0.1:9092'   
)
Query OK, 0 rows affected (0.01 sec)
限制
SOURCE 表目前不支持 drop 和 alter。
创建 SOURCE 表时目前仅支持连接 kafka,且仅支持传输数据格式为 json。