阅读更多
一.背景
这个和demo1类似,只是提供另外一种实现方式,类似kafka 利用streamTableSource 来做。
二.代码
@Data
@ToString
public class UserInfo implements Serializable {
private Timestamp pTime;
private String userId;
private String itemId;
public UserInfo() {
}
public UserInfo(String userId, String itemId) {
this.userId = userId;
this.itemId = itemId;
this.pTime = new Timestamp(System.currentTimeMillis());
}
}
public class UserTableSource implements StreamTableSource<UserInfo>, DefinedRowtimeAttributes {
/**
* 返回类型
* @return
*/
@Override
public TypeInformation<UserInfo> getReturnType() {
return TypeInformation.of(UserInfo.class);
}
@Override
public TableSchema getTableSchema() {
// 可以 这样定义
// TableSchema schema = new TableSchema(
// new String[]{"pTime","userId","itemId"},
// new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.STRING});
return TableSchema.fromTypeInfo(getReturnType());
}
@Override
public String explainSource() {
return "userSource";
}
@Override
public DataStream<UserInfo> getDataStream(StreamExecutionEnvironment execEnv) {
UserDataSource source = new UserDataSource();
DataStream<UserInfo> userInfoDataStream = execEnv.addSource(source);
return userInfoDataStream;
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
RowtimeAttributeDescriptor descriptor =
new RowtimeAttributeDescriptor("pTime",
new ExistingField("pTime"),
new AscendingTimestamps());
return Collections.singletonList(descriptor);
}
}
public class UserStreamTableApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// create a TableEnvironment
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.registerTableSource("test", new UserTableSource());
Table result = tEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" +
" GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
// deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
SingleOutputStreamOperator allClick = tEnv.toRetractStream(result, Row.class)
.flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
out.collect(value.f1);
}).returns(Row.class);
// add sink or print
allClick.print();
env.execute("test");
}
}
0顶
0踩
分享到:
- 32 分钟前
- 浏览 10
- 分类:互联网
- 查看更多
评论
发表评论
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。