1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult;
public class FlinkKafkaTableExample { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.executeSql( "CREATE TABLE kafka_source (" + " `message` STRING," + " `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," + " WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flink-kafka'," + " 'properties.bootstrap.servers' = 'node1:9092'," + " 'properties.group.id' = 'flink-table-group'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'latest-offset'" + ")" );
tableEnv.executeSql( "CREATE TABLE kafka_sink (" + " `message` STRING," + " `processed_time` TIMESTAMP(3)" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flink-kafka-output'," + " 'properties.bootstrap.servers' = 'node1:9092'," + " 'format' = 'json'" + ")" );
TableResult result = tableEnv.executeSql( "INSERT INTO kafka_sink " + "SELECT UPPER(message), CURRENT_TIMESTAMP " + "FROM kafka_source" );
result.print(); } }
|